Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
**07/01/2026:** Added `waterdata.chunk_granularity(...)` — a context manager to control how finely the OGC `waterdata` getters split multi-value requests. By default the getters chunk only as much as the server's ~8 KB URL limit forces (the fewest sub-requests). But because every sub-request paginates, splitting a large result further is usually quota-neutral, so a caller who *knows* their pull is large can opt into a finer split for smoother progress, more even concurrency, and a smaller unit of retry/resume: `with waterdata.chunk_granularity("high"): df, md = waterdata.get_daily(monitoring_location_id=many_sites)`. The level is one of `"low"`, `"medium"`, or `"high"` — typed as `waterdata.GranularityLevel` (a `typing.Literal`), so a type checker rejects any other value and an invalid string raises `ValueError` at the `with`. Each level caps how many sub-chunks a multi-value argument is split into — `"low"` / `"medium"` / `"high"` cap at 2 / 8 / 32. That ceiling is a granularity constant, deliberately independent of the fan-out concurrency (`API_USGS_CONCURRENT`): how finely a query splits is orthogonal to how many sub-requests run at once. Capping the aggressive end at 32 bounds the blast radius — an accidental `"high"` on a huge list can't explode into thousands of sub-requests. There is no "off" level: not entering the block *is* off. It is deliberately a scoped `with` block rather than an automatic behavior or an environment variable, since the library can't tell in advance whether a query is large — a short-window query might fit in a single page, where extra chunks would only burn quota. Also exported at the top level as `dataretrieval.chunk_granularity`.

**06/23/2026:** **Breaking change (1.2.0):** the minimum supported Python is now **3.10** (`requires-python = ">=3.10"`). 3.9 support was already effectively broken — the `waterdata` module's dependencies (`anyio`, the test stack) require 3.10+, and the `waterdata` test modules already skipped on <3.10. `anyio` is now declared as a direct dependency (it is imported directly by `waterdata`), and the CI/ruff/mypy targets move to 3.10. Also fully removed the deprecated `variable_info` metadata property: the `NWIS_Metadata` override only warned and returned `None` (it relied on the defunct `get_pmcodes`), and the `BaseMetadata` abstract is gone too since nothing implemented it — accessing `.variable_info` now raises `AttributeError`. `site_info` is unaffected.

**06/23/2026:** **Breaking change (1.2.0):** removed the `nadp` module and the deprecated `samples` module ahead of the 1.2.0 release. `nadp` was deprecated on 05/01/2026 — NADP is not a USGS data source, so retrieve NADP data directly from https://nadp.slh.wisc.edu/. The `samples.get_usgs_samples` shim (a deprecated forward to the modern getter) is gone; use `waterdata.get_samples()` instead. `import dataretrieval.nadp` / `import dataretrieval.samples` now raise `ModuleNotFoundError`.
Expand Down
8 changes: 8 additions & 0 deletions dataretrieval/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
URLTooLong,
)

# Chunk-granularity control (a context manager) and its level type. Defined with
# the chunker in ``dataretrieval.ogc.chunking``; surfaced here for a stable
# public path ``from dataretrieval import chunk_granularity``.
from dataretrieval.ogc.chunking import GranularityLevel, chunk_granularity

# Resumable chunk-interruption exceptions. They are defined in
# ``dataretrieval.ogc.interruptions`` rather than ``dataretrieval.exceptions``
# because they carry pandas/httpx state and a resumable ``ChunkedCall`` handle,
Expand Down Expand Up @@ -93,5 +98,8 @@
"ChunkInterrupted",
"QuotaExhausted",
"ServiceInterrupted",
# chunk-granularity control (defined in ogc.chunking)
"chunk_granularity",
"GranularityLevel",
"__version__",
]
147 changes: 143 additions & 4 deletions dataretrieval/ogc/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
cartesian product of chunks. Requests that already fit get a trivial
single-step plan — ``ChunkedCall`` has one code path either way.

Granularity: the planner is conservative by default — it splits only as far as
the byte limit forces. A caller who knows their result is large can opt into a
finer split via the ``chunk_granularity`` context manager
(``"low"`` / ``"medium"`` / ``"high"``); the resolved cap drives
:meth:`ChunkPlan._refine`. See ``chunk_granularity`` for the why and the when.

This module owns the *execution* half — the event loop and bounded
concurrency that drive a plan to completion (``ChunkedCall``) plus the
public ``multi_value_chunked`` decorator. The neighboring concerns live in
Expand Down Expand Up @@ -69,8 +75,9 @@
import functools
import os
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from contextvars import copy_context
from typing import Any, cast
from typing import Any, Literal, cast, get_args

import httpx
import pandas as pd
Expand Down Expand Up @@ -172,6 +179,130 @@ def get_active_client() -> httpx.AsyncClient | None:
return _chunked_client.get()


# Chunk-granularity dial: opt-in to fan a query out *more finely* than the byte
# limit alone requires. Scoped to a ``with chunk_granularity(...):`` block (a
# ContextVar), deliberately NOT an env var (see :func:`chunk_granularity` for
# why). The ambient holds the resolved cap on sub-chunks per axis; ``0`` (the
# default, outside any block) means "chunk only as much as the byte limit needs".
_granularity: Ambient[int] = Ambient("ogc_chunk_granularity", 0)

#: The three accepted granularity levels, as a typing ``Literal`` so a type
#: checker rejects any other value at the call site.
GranularityLevel = Literal["low", "medium", "high"]

#: Valid levels derived from the type, so ``GranularityLevel`` stays the single
#: source of truth for what ``_resolve_granularity`` accepts (mirrors the
#: ``get_args``-based ``_VALID_ON_TIE`` / ``_VALID_FILE_TYPES`` in sibling
#: modules).
_VALID_LEVELS: tuple[GranularityLevel, ...] = get_args(GranularityLevel)

# Granularity's own ceiling on sub-chunks per axis — deliberately NOT tied to the
# concurrency default: fan-out *volume* (how many sub-requests a query becomes)
# is orthogonal to how many run at once (``API_USGS_CONCURRENT``). 32 is a sane
# single-axis ceiling that also bounds the blast radius of an accidental
# ``"high"`` on a very long list; the milder levels are a quarter and a
# sixteenth of it (the three are spaced 4x apart).
_GRANULARITY_MAX_CHUNKS = 32
_GRANULARITY_LEVELS: dict[str, int] = {
"low": _GRANULARITY_MAX_CHUNKS // 16, # 2
"medium": _GRANULARITY_MAX_CHUNKS // 4, # 8
"high": _GRANULARITY_MAX_CHUNKS, # 32
}


def _resolve_granularity(level: GranularityLevel) -> int:
"""
Map a granularity level name to its per-axis sub-chunk cap.

Parameters
----------
level : {"low", "medium", "high"}
The user-supplied level.

Returns
-------
int
The maximum sub-chunks per multi-value axis for that level
(``2`` / ``8`` / ``32``).

Raises
------
ValueError
If ``level`` is anything other than ``"low"``, ``"medium"``, or
``"high"`` — raised eagerly (at the ``with`` statement) so a typo fails
loudly rather than silently doing nothing.
"""
if level not in _VALID_LEVELS:
raise ValueError(
f"chunk_granularity level must be one of {_VALID_LEVELS}; got {level!r}."
)
return _GRANULARITY_LEVELS[level]


@contextmanager
def chunk_granularity(level: GranularityLevel) -> Iterator[None]:
"""
Scope how finely the OGC getters chunk multi-value requests.

By default the Water Data / NGWMN getters chunk a request only as much as
the server's ~8 KB URL-byte limit forces — the fewest sub-requests that
fit. That is the safe default, but it can be *needlessly* conservative:
because every sub-request paginates, splitting a large result further is
usually quota-neutral (ten states pulled as one under-limit request page
just as many times as ten per-state requests would). This context manager
lets a caller who *knows* their pull is large ask for that finer split —
trading the same pages for more, smaller sub-requests, which gives smoother
progress, more even concurrency, and a smaller unit of retry/resume.

Because the library can't tell in advance whether a query is large (ten
states over a short window might fit in a single page, where extra chunks
would only burn quota), this is a *deliberate* per-call knob rather than an
automatic behavior or a process-wide environment variable — scoping it to a
``with`` block keeps an aggressive setting from leaking into unrelated calls
and accidentally spending quota. Outside any block the getters use the
conservative default; there is no "off" level because *not* entering the
block is off. Only the OGC getters (Water Data, NGWMN) read this; wrapping a
legacy NWIS call in the block is a harmless no-op.

Parameters
----------
level : {"low", "medium", "high"}
How aggressively to chunk within the block. Each level caps how many
sub-chunks a single multi-value argument is split into — ``2`` / ``8`` /
``32`` for ``"low"`` / ``"medium"`` / ``"high"`` — or one value per
sub-request once the argument has fewer values than the cap. The ceiling
is fixed (it is *not* tied to ``API_USGS_CONCURRENT``: how finely a query
splits is orthogonal to how many sub-requests run at once); capping
``"high"`` at ``32`` keeps an accidental aggressive level on a very long
list from exploding into thousands of sub-requests. (With several
multi-value arguments the per-argument counts still multiply.)

Yields
------
None

Raises
------
ValueError
If ``level`` isn't ``"low"``, ``"medium"``, or ``"high"`` — raised on
``with`` entry, before any request is issued.

Examples
--------
>>> from dataretrieval import waterdata
>>> with waterdata.chunk_granularity("high"):
... df, md = waterdata.get_daily(
... monitoring_location_id=many_sites, parameter_code="00060"
... ) # doctest: +SKIP

See Also
--------
ChunkPlan._refine : the planning-side effect of the level.
"""
with _granularity(_resolve_granularity(level)):
yield


class ChunkedCall:
"""
Stateful handle for a chunked call.
Expand Down Expand Up @@ -591,8 +722,9 @@ def multi_value_chunked(
``async def fetch(args) -> (df, response)``, and drives it to
completion via :meth:`ChunkedCall.resume`. The plan splits multi-value
list params and the cql-text filter so each sub-request URL fits the
byte limit; an already-fitting request is a one-step plan. See the
module docstring for the concurrency model.
byte limit; an already-fitting request is a one-step plan, unless an
active :func:`chunk_granularity` block asks the plan to fan out more
finely. See the module docstring for the concurrency model.

Parameters
----------
Expand Down Expand Up @@ -636,7 +768,14 @@ def wrapper(
finalize: _Finalize = _passthrough_result,
) -> tuple[pd.DataFrame, Any]:
limit = _OGC_URL_BYTE_LIMIT if url_limit is None else url_limit
plan = ChunkPlan(args, build_request, limit)
# Read the granularity dial from the ambient set by
# ``chunk_granularity`` (0 = off outside any such block; otherwise the
# per-axis sub-chunk cap). It only affects *planning*, done here up
# front, so a later resume — which re-issues the already-planned
# sub-requests — needs no snapshot.
plan = ChunkPlan(
args, build_request, limit, max_chunks_per_axis=_granularity.get()
)
retry_policy = RetryPolicy.from_env()
# The concurrency cap is resolved inside ``resume()`` from
# ``API_USGS_CONCURRENT``; ``1`` is a sequential gather,
Expand Down
97 changes: 85 additions & 12 deletions dataretrieval/ogc/planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,21 @@ def _extract_axes(args: dict[str, Any]) -> list[_Axis]:
return axes


def _split_at(chunks: list[list[str]], idx: int) -> None:
"""Replace ``chunks[idx]`` in place with its two contiguous halves.

The single primitive both planning passes use to fan an axis out. It
preserves the partition invariants every consumer relies on: *coverage*
(each atom survives, exactly once) and *contiguous, deterministic order*
(resume and :meth:`ChunkPlan.iter_sub_args` depend on it). Kept in one
place so those invariants can't drift between :meth:`ChunkPlan._plan`
(byte-driven) and :meth:`ChunkPlan._refine` (granularity-driven).
"""
chunk = chunks[idx]
mid = len(chunk) // 2
chunks[idx : idx + 1] = [chunk[:mid], chunk[mid:]]


class ChunkPlan:
"""
Strategy for issuing one user-level request as a sequence of
Expand All @@ -312,7 +327,17 @@ class ChunkPlan:
Factory that turns a kwargs dict into a sized httpx request,
e.g. ``_construct_api_requests``.
url_limit : int
Byte budget for the request (URL + body).
Byte budget for the request (URL + body) — a hard ceiling every
sub-request must fit.
max_chunks_per_axis : int, optional
Soft cap on sub-chunks per multi-value axis (default ``0`` = off).
``0`` chunks only as much as ``url_limit`` requires — the most
conservative plan, fewest sub-requests. A positive cap fans each axis
out into up to ``min(len(atoms), max_chunks_per_axis)`` pieces (never
fewer than the byte budget already forces), so a large multi-page pull
is issued as more, smaller sub-requests. Set from the resolved
:func:`~dataretrieval.ogc.chunking.chunk_granularity` level; see
:meth:`_refine`.

Attributes
----------
Expand Down Expand Up @@ -344,6 +369,7 @@ def __init__(
args: dict[str, Any],
build_request: Callable[..., httpx.Request],
url_limit: int,
max_chunks_per_axis: int = 0,
) -> None:
self.args = args
self.axes: list[_Axis] = []
Expand All @@ -352,10 +378,10 @@ def __init__(

axes = _extract_axes(args)
if not axes:
# No chunkable axis: nothing to split. If the single request fits,
# run it verbatim (the common passthrough). ``_safe_request_bytes``
# treats an un-constructable URL (httpx.InvalidURL, > 64 KB) as over
# budget.
# No chunkable axis: nothing to split, and ``granularity`` has
# nothing to act on either. If the single request fits, run it
# verbatim (the common passthrough). ``_safe_request_bytes`` treats
# an un-constructable URL (httpx.InvalidURL, > 64 KB) as over budget.
if _safe_request_bytes(build_request, args, url_limit) <= url_limit:
return
# Over budget. A filter the chunker doesn't manage — cql-json — is
Expand Down Expand Up @@ -388,14 +414,29 @@ def __init__(
except httpx.InvalidURL:
initial_request = None

fits = False
if initial_request is not None:
self.canonical_url = str(initial_request.url)
if _request_bytes(initial_request) <= url_limit:
return
fits = _request_bytes(initial_request) <= url_limit

# A request that already fits and hasn't opted into finer chunking is
# the common passthrough: leave ``axes``/``chunks`` empty so
# ``total == 1`` and ``iter_sub_args`` yields the original args
# verbatim. Only when ``max_chunks_per_axis`` asks for extra fan-out do
# we set the axes up to be refined below.
if fits and max_chunks_per_axis <= 0:
return

self.axes = axes
self.chunks = {axis.arg_key: [list(axis.atoms)] for axis in axes}
self._plan(build_request, url_limit)
if not fits:
# Hard pass: greedy-halve until every worst-case sub-request fits
# the byte budget (may raise ``Unchunkable``).
self._plan(build_request, url_limit)
# Soft pass: optionally split further than the byte budget requires.
# Purely additive — never re-raises, and the byte budget stays
# satisfied; a no-op at ``max_chunks_per_axis <= 0``.
self._refine(max_chunks_per_axis)

if self.canonical_url is None:
# Original URL was un-constructable (httpx.InvalidURL); fall
Expand Down Expand Up @@ -447,10 +488,42 @@ def _plan(
f"sub-request). Reduce input sizes, shorten or simplify "
f"the filter, or split the call manually."
)
axis_chunks = self.chunks[biggest_axis.arg_key]
chunk = axis_chunks[biggest_idx]
mid = len(chunk) // 2
axis_chunks[biggest_idx : biggest_idx + 1] = [chunk[:mid], chunk[mid:]]
_split_at(self.chunks[biggest_axis.arg_key], biggest_idx)

def _refine(self, max_chunks_per_axis: int) -> None:
"""
Fan each axis out more finely than the byte budget alone requires —
the granularity dial (see
:func:`~dataretrieval.ogc.chunking.chunk_granularity` for why a caller
would want this).

Each axis is split until it holds at least
``min(len(atoms), max_chunks_per_axis)`` chunks — up to the cap, or one
atom per chunk for a shorter axis. Purely additive — only ever *splits*
existing chunks, so the byte pass's work and the ``url_limit`` invariant
are both preserved (an axis the byte pass already split past the cap is
left alone), and it never raises. A no-op at ``max_chunks_per_axis <= 0``.

Parameters
----------
max_chunks_per_axis : int
Soft cap on sub-chunks per axis (``0`` = off), the resolved
granularity level. A shorter axis simply saturates at one atom per
chunk.
"""
if max_chunks_per_axis <= 0:
return
for axis in self.axes:
chunks = self.chunks[axis.arg_key]
target = min(len(axis.atoms), max_chunks_per_axis)
# ``target <= len(atoms)`` guarantees a splittable chunk each pass, so
# this reaches exactly ``target`` and terminates. Split the chunk with
# the most *atoms* (``_plan`` splits by *bytes*): here we even out
# cardinality for smooth fan-out, not URL size. Ties take the lowest
# index, keeping the split deterministic.
while len(chunks) < target:
idx, _ = max(enumerate(chunks), key=lambda kv: len(kv[1]))
_split_at(chunks, idx)

def _worst_case_args(self) -> dict[str, Any]:
"""
Expand Down
3 changes: 3 additions & 0 deletions dataretrieval/waterdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from __future__ import annotations

from dataretrieval.ogc.chunking import GranularityLevel, chunk_granularity
from dataretrieval.ogc.filters import FILTER_LANG

# Public API exports
Expand Down Expand Up @@ -50,6 +51,8 @@
"PROFILE_LOOKUP",
"SERVICES",
"WATERDATA_SERVICES",
"GranularityLevel",
"chunk_granularity",
"get_channel",
"get_codes",
"get_combined_metadata",
Expand Down
Loading