From cd70929ef1ea4c0bd9fbcc80f1f3d31fb5bc7752 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sun, 17 May 2026 09:56:44 -0500 Subject: [PATCH 1/6] feat(waterdata): Add multi-value GET-parameter chunker for OGC API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For multi-value waterdata queries (e.g. monitoring_location_id with ~300+ sites), the GET URL produced by PR #233 blows past the server's ~8 KB nginx buffer and the API returns HTTP 414. This PR adds a chunker that transparently splits long list params across sub-requests so each URL fits the byte budget. The chunker is a decorator applied to ``_fetch_once`` outside the existing ``@filters.chunked`` (CQL chunker), so list-chunking is the outer loop and filter-chunking is the inner loop: @chunking.multi_value_chunked(build_request=_construct_api_requests) @filters.chunked(build_request=_construct_api_requests) def _fetch_once(args): ... Key design points: - ``_plan_chunks`` greedy-halves the largest chunk across all dimensions until the worst-case sub-request fits ``url_limit`` (URL + body, via ``_request_bytes``, so POST routes are sized correctly). Cartesian product of per-dim partitions becomes the sub-request set; capped at ``max_chunks=1000``. - ``_filter_aware_probe_args`` coordinates with ``filters.chunked``: the planner probes URL length using a synthetic clause that matches the inner filter chunker's bail-floor size (longest single clause, scaled by worst-case URL encoding ratio). Without this coordination, the outer planner would raise ``RequestTooLarge`` on combinations the stacked chunkers can actually handle. - ``QuotaExhausted`` mid-call guard reads ``x-ratelimit-remaining`` after each sub-request; if it drops below ``quota_safety_floor=50``, the wrapper raises with the partial frame, completed-chunk offset, and last observed remaining quota — letting callers salvage or resume after the rate-limit window resets, rather than crash into a silent mid-pagination 429. - ``RequestTooLarge`` is raised when the smallest reducible plan still exceeds ``url_limit`` (every multi-value param at a singleton chunk and any chunkable filter at the inner chunker's bail floor) or when the cartesian product exceeds ``max_chunks``. - All defaults (``url_limit``, ``max_chunks``, ``quota_safety_floor``) resolve at call time, so monkey-patching ``filters._WATERDATA_URL_ BYTE_LIMIT`` for tests / non-default quotas affects the decorator uniformly. Public additions: - ``dataretrieval.waterdata.chunking.multi_value_chunked`` - ``dataretrieval.waterdata.chunking.RequestTooLarge`` - ``dataretrieval.waterdata.chunking.QuotaExhausted`` (carries ``partial_frame``, ``partial_response``, ``completed_chunks``, ``total_chunks``, ``remaining``) Tests (30 new): - ``_filter_aware_probe_args`` worst-case-clause modelling - ``_plan_chunks`` greedy halving, RequestTooLarge floor, filter- chunker coordination, ``max_chunks`` cap, lazy-default reads - ``multi_value_chunked`` pass-through, cartesian-product shape, end-to-end with stacked filter chunker - ``QuotaExhausted`` header parsing, mid-call abort, last-chunk no- abort, zero-floor disable - ``RequestTooLarge`` message contents and triggering conditions End-to-end correctness verified against the live API: identical per-site cell-for-cell output between unchunked (single call) and chunked (forced fan-out via patched limit) paths. Co-Authored-By: Claude Opus 4.7 (1M context) --- NEWS.md | 2 + dataretrieval/waterdata/api.py | 15 + dataretrieval/waterdata/chunking.py | 478 ++++++++++++++++++++++++ dataretrieval/waterdata/filters.py | 51 ++- dataretrieval/waterdata/utils.py | 61 ++- tests/waterdata_test.py | 561 ++++++++++++++++++++++++++++ 6 files changed, 1146 insertions(+), 22 deletions(-) create mode 100644 dataretrieval/waterdata/chunking.py diff --git a/NEWS.md b/NEWS.md index 7761e29b..3d9a5814 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**05/17/2026:** The OGC `waterdata` getters (`get_daily`, `get_continuous`, `get_field_measurements`, and the rest of the multi-value-capable functions) now transparently chunk requests whose URLs would otherwise exceed the server's ~8 KB byte limit. A common chained-query pattern — pull a long site list from `get_monitoring_locations`, then feed it into `get_daily` — previously failed with HTTP 414 once the resulting URL grew past the limit; it now fans out across multiple sub-requests under the hood and returns one combined DataFrame. The chunker coordinates with the existing CQL `filter` chunker (long top-level-`OR` filters still split correctly when used alongside long multi-value lists), caps cartesian-product plans at 1000 sub-requests (the default USGS hourly quota), and aborts mid-call with a structured `QuotaExhausted` exception — carrying the partial result and a resume offset — if `x-ratelimit-remaining` drops below a safety floor. Mirrors R `dataRetrieval`'s [#870](https://github.com/DOI-USGS/dataRetrieval/pull/870), generalized to N dimensions. Note one metadata-behavior change for paginated/chunked calls: `BaseMetadata.url` still reflects the user's original query (unchanged), but `BaseMetadata.header` now carries the *last* page's / sub-request's headers (so `x-ratelimit-remaining` is current) rather than the first, and `BaseMetadata.query_time` is now the cumulative wall-clock across pages instead of the first page's elapsed. + **05/16/2026:** Fixed silent truncation in the paginated `waterdata` request loops (`_walk_pages` and `get_stats_data`). Mid-pagination failures (HTTP 429, 5xx, network error) were previously swallowed — pagination would quietly stop and the function would return whatever rows it had collected, leaving callers with truncated DataFrames they had no way to detect. The loops now status-check every page like the initial request and raise `RuntimeError` on any failure, with the upstream exception chained as `__cause__` and a short menu of recovery actions (wait and retry, reduce the request, or obtain an API token) in the message. **Behavior change**: callers that previously consumed partial DataFrames on transient upstream blips will now see an exception; retry the call (possibly with a smaller `limit` or narrower query). **05/07/2026:** Bumped the declared minimum Python version from **3.8** to **3.9** (`pyproject.toml`'s `requires-python` and the ruff target). This brings the manifest in line with what was already being tested — CI's matrix has long covered only 3.9, 3.13, and 3.14, the `waterdata` test module already skipped itself on Python < 3.10, and several modules already use 3.9-only stdlib (e.g. `zoneinfo`). Users on 3.8 will no longer be able to install the package; please upgrade. diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ad268194..025aafcd 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -230,6 +230,21 @@ def get_daily( ... parameter_code="00060", ... last_modified="P7D", ... ) + + >>> # Chain queries: pull all stream sites in a state, then their + >>> # daily discharge for the last week. The site list can be hundreds + >>> # of values long — the request is transparently chunked across + >>> # multiple sub-requests so the URL stays under the server's byte + >>> # limit. Combined output looks like a single query. + >>> sites_df, _ = dataretrieval.waterdata.get_monitoring_locations( + ... state_name="Ohio", + ... site_type="Stream", + ... ) + >>> df, md = dataretrieval.waterdata.get_daily( + ... monitoring_location_id=sites_df["monitoring_location_id"].tolist(), + ... parameter_code="00060", + ... time="P7D", + ... ) """ service = "daily" output_id = "daily_id" diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py new file mode 100644 index 00000000..bbdf5b12 --- /dev/null +++ b/dataretrieval/waterdata/chunking.py @@ -0,0 +1,478 @@ +"""Multi-value GET-parameter chunking for the Water Data OGC getters. + +PR 233 routes most services through GET with comma-separated values +(e.g. ``monitoring_location_id=USGS-A,USGS-B,...``). Long lists can blow +the server's ~8 KB URL byte limit. This module adds a decorator that +sits OUTSIDE ``filters.chunked`` and splits multi-value list params +across multiple sub-requests so each URL fits. See ``get_daily``'s +docstring for an end-to-end chained-query example. + +Design (orthogonal to filter chunking): + +- N-dimensional cartesian product: for each chunkable list param, the + values are partitioned into sub-lists; the planner emits the cartesian + product of those partitions. Sub-chunks of the same dim never overlap, + so frame concat needs no dedup across multi-value chunks. +- Greedy halving of the largest chunk in any dim until the worst-case + sub-request URL fits the limit. Minimises total request count. +- Date params, ``bbox``, and ``properties`` are not chunked: dates are + intervals not enumerable sets; bbox is a coord array; ``properties`` + determines output schema and chunking it would shard columns. + +Coordination with ``filters.chunked``: +The planner probes the URL with a synthetic clause sized to the inner +chunker's bail floor — ``len(longest_clause) * max(per-clause encoding +ratio)`` — when a chunkable filter is present. The inner chunker bails +(emits the full filter) when any single clause's URL-encoded length +exceeds its per-sub-request budget; mirroring +``filters._effective_filter_budget``, that floor already accounts for +the worst per-call encoding ratio, so a long alphanumeric clause +coexisting with a shorter heavily-encoded clause is sized correctly. +Without this coordination, a long OR-filter plus multi-value lists +would trigger a premature ``RequestTooLarge`` even though the combined +chunkers would have made things fit. +""" + +from __future__ import annotations + +import functools +import itertools +import math +from collections.abc import Callable +from typing import Any +from urllib.parse import quote_plus + +import pandas as pd +import requests + +from . import filters +from .filters import ( + _combine_chunk_frames, + _combine_chunk_responses, + _FetchOnce, + _is_chunkable, + _max_per_clause_encoding_ratio, + _split_top_level_or, +) + +# Params that look like lists but must NOT be chunked. ``properties`` is +# excluded because it defines the response schema; chunking it would +# return frames with different columns per sub-request. ``bbox`` is a +# fixed 4-element coord tuple. Date params are intervals not sets. The +# CQL ``filter`` (and its ``filter_lang``) is a string that has its own +# inner chunker (``filters.chunked``); if a caller passes ``filter`` as +# a list, treating it as a multi-value param would emit malformed CQL. +_NEVER_CHUNK = frozenset( + { + "properties", + "bbox", + "datetime", + "last_modified", + "begin", + "begin_utc", + "end", + "end_utc", + "time", + "filter", + "filter_lang", + } +) + +# Default cap on the number of sub-requests a single chunked call may +# emit. The USGS Water Data API rate-limits each HTTP request (including +# pagination), so the true budget is ``hourly_quota / avg_pages_per_chunk``. +# 1000 matches the default hourly quota and is a reasonable upper bound +# for single-page sub-requests; tune lower if your queries paginate. +# Override per-decorator via ``max_chunks=`` or by monkeypatching this +# module attribute — both the decorator wrapper and ``_plan_chunks`` +# read it lazily. +_DEFAULT_MAX_CHUNKS = 1000 + +# When ``x-ratelimit-remaining`` drops below this between sub-requests, +# the chunker bails with ``QuotaExhausted`` rather than risk a mid-call +# HTTP 429. Carries the partial result so callers can resume from a +# known offset instead of retrying the whole chunked call from scratch. +_DEFAULT_QUOTA_SAFETY_FLOOR = 50 + +# Response header USGS uses to advertise remaining hourly quota. +_QUOTA_HEADER = "x-ratelimit-remaining" + +# Sentinel returned by ``_read_remaining`` when the response has no +# parseable ``x-ratelimit-remaining`` header. Large enough to beat any +# plausible safety floor so a missing/malformed header doesn't trigger +# spurious ``QuotaExhausted`` aborts. +_QUOTA_UNKNOWN = 10**9 + + +class RequestTooLarge(ValueError): + """Raised when a chunked request cannot be issued. Two cases: + (1) URL exceeds the byte limit even with every multi-value param at + a singleton chunk and any chunkable filter at the inner chunker's + bail-floor size (the URL contribution of its longest single + OR-clause, after URL-encoding); (2) the cartesian-product plan + would issue more than ``max_chunks`` sub-requests.""" + + +class QuotaExhausted(RuntimeError): + """Raised mid-chunked-call when the API's reported remaining quota + (``x-ratelimit-remaining`` header) drops below the configured safety + floor. The chunker stops before issuing the next sub-request to + avoid a mid-call HTTP 429 that would silently truncate paginated + results (see PR #273 for the pagination side of that bug). + + The exception carries everything needed to resume: the combined + partial frame from completed sub-requests, the metadata for the + last successful sub-request, the number of chunks completed out of + the plan total, and the last-observed ``remaining`` value. + + Attributes + ---------- + partial_frame : pd.DataFrame + Concatenated, deduplicated result of every sub-request that + completed before the floor was crossed. + partial_response : requests.Response + Aggregated response with the canonical URL restored to the + user's full original query (rebuilt from ``args`` rather than + copied from the first sub-request, which only carries the + first chunk's parameters). Headers come from the last + completed sub-request (so callers inspecting + ``x-ratelimit-remaining`` see current quota state), and + ``elapsed`` is summed across completed sub-requests. Wrap in + ``BaseMetadata`` to surface to the caller alongside the + partial frame. + completed_chunks : int + Number of sub-requests successfully completed. + total_chunks : int + Total sub-requests in the cartesian-product plan. + remaining : int + Last observed ``x-ratelimit-remaining`` value. + """ + + def __init__( + self, + *, + partial_frame: pd.DataFrame, + partial_response: requests.Response, + completed_chunks: int, + total_chunks: int, + remaining: int, + ) -> None: + super().__init__( + f"x-ratelimit-remaining dropped to {remaining} after " + f"{completed_chunks}/{total_chunks} chunks; aborting to avoid " + f"mid-call HTTP 429. Catch QuotaExhausted to access " + f".partial_frame and resume from chunk {completed_chunks}." + ) + self.partial_frame = partial_frame + self.partial_response = partial_response + self.completed_chunks = completed_chunks + self.total_chunks = total_chunks + self.remaining = remaining + + +def _chunkable_params(args: dict[str, Any]) -> dict[str, list[Any]]: + """Return ``{name: list(values)}`` for every list/tuple kwarg with + >1 element that is allowed to chunk.""" + return { + k: list(v) + for k, v in args.items() + if k not in _NEVER_CHUNK and isinstance(v, (list, tuple)) and len(v) > 1 + } + + +def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: + """Substitute the filter with a synthetic ASCII clause sized to the + inner chunker's bail floor, so the planner's URL probe matches what + the inner chunker would emit. + + The inner ``filters.chunked`` bails (emits the full filter) when any + single OR-clause's URL-encoded length exceeds the per-sub-request + budget. Mirroring ``filters._effective_filter_budget``, that floor + is ``len(longest_clause) * max(per-clause encoding ratio)``. + Substituting an ASCII clause of that exact length makes + ``quote_plus`` a no-op, so the URL builder sees exactly the + bail-floor byte count. + """ + filter_expr = args.get("filter") + filter_lang = args.get("filter_lang") + if not _is_chunkable(filter_expr, filter_lang): + return args + parts = _split_top_level_or(filter_expr) + if len(parts) < 2: + return args # one-clause filter — inner chunker can't shrink it + longest_raw = max(len(p) for p in parts) + probe_size = math.ceil(longest_raw * _max_per_clause_encoding_ratio(parts)) + return {**args, "filter": "x" * probe_size} + + +def _chunk_bytes(chunk: list[Any]) -> int: + """URL-encoded byte length of ``chunk`` when comma-joined into a + URL parameter value. + + Used as the planner's biggest-chunk comparator in + ``_worst_case_args`` and the halving loop. ``quote_plus`` (rather + than raw ``,``-join length) keeps the comparator faithful to what + the real URL builder produces, so values containing characters + that expand under URL encoding (``%``, ``+``, ``/``, ``&``, …) + can't be mis-ranked. For typical USGS multi-value workloads + (alphanumeric IDs and codes) raw and encoded lengths are equal, + but the encoded form is always correct. + """ + return len(quote_plus(",".join(map(str, chunk)))) + + +def _request_bytes(req: requests.PreparedRequest) -> int: + """Total bytes of a prepared request: URL + body. + + GET routes have ``body=None`` and reduce to URL length. POST routes + (CQL2 JSON body) need body bytes — the URL stays short regardless of + payload, so URL-only sizing would underestimate the request and skip + chunking when it's needed. + + Raises ``TypeError`` on non-sizable bodies (generators, file-like + streams). Size-based planning needs a deterministic byte count; + silently treating an unknown body as zero bytes would under-chunk + and let the request blow past the server's POST-body limit. + """ + url_len = len(req.url) + body = req.body + if body is None: + return url_len + if isinstance(body, (bytes, bytearray)): + return url_len + len(body) + if isinstance(body, str): + return url_len + len(body.encode("utf-8")) + raise TypeError( + f"multi_value_chunked cannot size a request body of type " + f"{type(body).__name__!r}; pass str, bytes, or None. Streaming " + f"bodies (generators, file-like) are not supported because the " + f"planner needs a deterministic byte count up front." + ) + + +def _plan_total(plan: dict[str, list[list[Any]]]) -> int: + """Sub-request count a plan will issue: the cartesian product of + per-dim chunk counts. Computed in two places (planner's max_chunks + early-bail and wrapper's QuotaExhausted payload) — centralized + here so the two can't drift.""" + return math.prod(len(chunks) for chunks in plan.values()) + + +def _worst_case_args( + probe_args: dict[str, Any], plan: dict[str, list[list[Any]]] +) -> dict[str, Any]: + """Args representing the worst-case sub-request the plan will issue: + each dim's largest chunk (by URL-encoded bytes), composed onto + the ``probe_args`` already returned by ``_filter_aware_probe_args`` + so any chunkable filter sits at the inner chunker's bail-floor + size. The planner feeds these args through ``_request_bytes`` to + decide whether the biggest sub-request fits the budget.""" + out = dict(probe_args) + for k, chunks in plan.items(): + out[k] = max(chunks, key=_chunk_bytes) + return out + + +def _plan_chunks( + args: dict[str, Any], + build_request: Callable[..., Any], + url_limit: int, + max_chunks: int | None = None, +) -> dict[str, list[list[Any]]] | None: + """Greedy halving until the worst-case sub-request fits ``url_limit``. + + Budget is total request bytes (URL + body, via ``_request_bytes``) + so POST routes size correctly — see ``multi_value_chunked`` for the + parameter-name caveat. + + Returns ``None`` when no chunking is needed (request as-is fits or + no chunkable lists). Raises ``RequestTooLarge`` when: + - the smallest reducible plan still exceeds ``url_limit`` (every + multi-value param at a singleton chunk and any chunkable filter + already at the inner chunker's bail-floor size), or + - the cartesian-product plan exceeds ``max_chunks`` sub-requests + (the hourly API budget); checked after each split so we bail + promptly once the cap is unreachable. + + ``max_chunks`` defaults to ``_DEFAULT_MAX_CHUNKS`` resolved at call + time, so monkeypatching the module constant takes effect for + direct callers too. + """ + if max_chunks is None: + max_chunks = _DEFAULT_MAX_CHUNKS + if max_chunks < 1: + raise ValueError( + f"max_chunks must be >= 1; got {max_chunks}. Zero or negative " + f"values would silently bypass the cap on the no-chunking path." + ) + chunkable = _chunkable_params(args) + if not chunkable: + return None + probe_args = _filter_aware_probe_args(args) + if _request_bytes(build_request(**probe_args)) <= url_limit: + return None + + plan: dict[str, list[list[Any]]] = {k: [v] for k, v in chunkable.items()} + + while True: + worst = _worst_case_args(probe_args, plan) + if _request_bytes(build_request(**worst)) <= url_limit: + return plan + + # Largest splittable chunk across all dims, by URL-encoded bytes. + splittable = ( + (dim, idx, chunk) + for dim, dim_chunks in plan.items() + for idx, chunk in enumerate(dim_chunks) + if len(chunk) > 1 + ) + biggest = max(splittable, key=lambda t: _chunk_bytes(t[2]), default=None) + if biggest is None: + raise RequestTooLarge( + f"Request exceeds {url_limit} bytes (URL + body) at the " + f"smallest reducible plan: every multi-value parameter " + f"at a singleton chunk and any chunkable filter at the " + f"inner chunker's bail-floor size. Reduce the number " + f"of values, shorten the filter, or split the call " + f"manually." + ) + dim, idx, chunk = biggest + mid = len(chunk) // 2 + plan[dim] = plan[dim][:idx] + [chunk[:mid], chunk[mid:]] + plan[dim][idx + 1 :] + + # Each split only grows the cartesian product, so once we + # cross max_chunks we can never come back under. Bail now + # rather than keep splitting (the URL probe could still take + # many more iterations). + total = _plan_total(plan) + if total > max_chunks: + raise RequestTooLarge( + f"Chunked plan would issue {total} sub-requests, exceeding " + f"max_chunks={max_chunks} (USGS API's default hourly rate " + f"limit per key). Reduce input list sizes, narrow the time " + f"window, or raise max_chunks if you have a higher quota." + ) + + +def _read_remaining(response: requests.Response) -> int: + """Parse ``x-ratelimit-remaining`` from a response. Missing or + malformed header → return ``_QUOTA_UNKNOWN`` so the safety check + treats it as 'plenty of quota' (don't abort on header glitches).""" + raw = response.headers.get(_QUOTA_HEADER) + if raw is None: + return _QUOTA_UNKNOWN + try: + return int(raw) + except (TypeError, ValueError): + return _QUOTA_UNKNOWN + + +def multi_value_chunked( + *, + build_request: Callable[..., Any], + url_limit: int | None = None, + max_chunks: int | None = None, + quota_safety_floor: int | None = None, +) -> Callable[[_FetchOnce], _FetchOnce]: + """Decorator that splits multi-value list params across sub-requests + so each sub-request fits ``url_limit`` bytes (defaults to + ``filters._WATERDATA_URL_BYTE_LIMIT``) and the cartesian-product + plan stays ≤ ``max_chunks`` sub-requests (defaults to + ``_DEFAULT_MAX_CHUNKS``). All defaults are resolved at call time so + tests/users that patch the module constants affect this decorator + uniformly. + + ``url_limit`` is enforced against total request bytes (URL + body, + via ``_request_bytes``); the name reflects the dominant GET case + where body is empty. POST routes (e.g. ``monitoring-locations`` via + CQL2 JSON) are conservatively sized — never under-chunks, but may + over-chunk at the body's true ceiling. + + Between sub-requests the wrapper reads ``x-ratelimit-remaining`` from + each response. If it drops below ``quota_safety_floor`` (default + ``_DEFAULT_QUOTA_SAFETY_FLOOR``), the wrapper raises ``QuotaExhausted`` + carrying the combined partial result and the chunk offset so callers + can resume after the hourly window resets, instead of crashing into + a mid-pagination HTTP 429 (which the upstream pagination loop in + ``_walk_pages`` historically truncated silently — see PR #273). + + Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop, + filter-chunking is the inner loop. The wrapped function has the same + signature as ``filters.chunked`` expects — ``(args: dict) -> (frame, + response)`` — so the two decorators compose cleanly. The planner is + filter-aware so it doesn't raise prematurely when the inner filter + chunker would have shrunk the per-sub-request URL on its own. + + Sub-requests run sequentially with no per-call timeout enforced here. + A hung single sub-request will block the entire chunked call; the + caller is responsible for configuring an HTTP-layer timeout (e.g. + via a ``requests.Session`` wrapper) if bounded latency matters. + + Cartesian-product iteration order is deterministic for a given + ``args`` dict: the wrapper iterates ``plan.values()`` in insertion + order (Python 3.7+ guarantee), which equals the order in which + chunkable params appeared in ``args``. For the public waterdata + getters that order is the function-signature order, so + ``QuotaExhausted.completed_chunks`` maps to the same sub-requests + across repeated calls with the same arguments — resume is + well-defined. + """ + + def decorator(fetch_once: _FetchOnce) -> _FetchOnce: + @functools.wraps(fetch_once) + def wrapper( + args: dict[str, Any], + ) -> tuple[pd.DataFrame, requests.Response]: + limit = ( + url_limit + if url_limit is not None + else filters._WATERDATA_URL_BYTE_LIMIT + ) + floor = ( + quota_safety_floor + if quota_safety_floor is not None + else _DEFAULT_QUOTA_SAFETY_FLOOR + ) + plan = _plan_chunks(args, build_request, limit, max_chunks) + if plan is None: + return fetch_once(args) + + # Pre-build the canonical URL representing the user's full + # original query. The chunker sends sub-requests with sliced + # multi-value lists; without this restore, the aggregated + # response's ``.url`` would only show the first chunk and + # callers logging ``md.url`` for reproducibility would see a + # truncated view of their own query. + canonical_url = build_request(**args).url + + keys = list(plan) + total = _plan_total(plan) + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + for i, combo in enumerate(itertools.product(*(plan[k] for k in keys))): + sub_args = {**args, **dict(zip(keys, combo))} + frame, response = fetch_once(sub_args) + frames.append(frame) + responses.append(response) + # Skip the quota check after the last sub-request — + # nothing left to abort. + if i < total - 1: + remaining = _read_remaining(response) + if remaining < floor: + partial = _combine_chunk_responses(responses) + partial.url = canonical_url + raise QuotaExhausted( + partial_frame=_combine_chunk_frames(frames), + partial_response=partial, + completed_chunks=i + 1, + total_chunks=total, + remaining=remaining, + ) + + combined = _combine_chunk_responses(responses) + combined.url = canonical_url + return _combine_chunk_frames(frames), combined + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py index 4c136b82..deac263e 100644 --- a/dataretrieval/waterdata/filters.py +++ b/dataretrieval/waterdata/filters.py @@ -152,6 +152,18 @@ def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: return chunks +def _max_per_clause_encoding_ratio(parts: list[str]) -> float: + """Worst per-clause ``len(quote_plus(p)) / len(p)`` across OR-clauses. + + Any sub-request chunk could end up containing only the heavier-encoding + clauses, so per-sub-request byte budgets must be sized against the + worst (not average) ratio to avoid overflow. Used by both this + module's filter chunker and the outer ``chunking._filter_aware_probe_args``; + pinning the formula here keeps the two from drifting. + """ + return max(len(quote_plus(p)) / len(p) for p in parts) + + def _effective_filter_budget( args: dict[str, Any], filter_expr: str, @@ -163,8 +175,7 @@ def _effective_filter_budget( non-filter URL bytes by building the request with a 1-byte placeholder filter, subtract from the URL limit to get the bytes available for the encoded filter, then convert back to raw CQL bytes via the *maximum* - per-clause encoding ratio (a chunk could contain only the heavier-encoding - clauses, so budgeting by the average ratio could overflow). + per-clause encoding ratio. """ # Fast path: encoded filter clearly fits with room for any plausible # non-filter URL. Skips the PreparedRequest build and splitter scan. @@ -179,7 +190,7 @@ def _effective_filter_budget( # the caller sees one 414 instead of N parallel sub-request failures. return len(filter_expr) + 1 parts = _split_top_level_or(filter_expr) or [filter_expr] - encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts) + encoding_ratio = _max_per_clause_encoding_ratio(parts) return max(100, int(available_url_bytes / encoding_ratio)) @@ -268,13 +279,24 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: def _combine_chunk_responses( responses: list[requests.Response], ) -> requests.Response: - """Return one response: first chunk's URL/headers + summed ``elapsed``. - - Mutates the first response in place (only ``elapsed``); downstream only - reads ``elapsed`` (in ``BaseMetadata.query_time``), URL, and headers. + """Return one response with the last chunk's headers (for current + rate-limit state) and summed ``elapsed`` (for total wall-clock). + + The returned response's ``.url`` is the *first chunk's* URL, which + only reflects the first slice of the user's query. Callers wanting + the canonical original-query URL on ``BaseMetadata`` must overwrite + ``.url`` themselves (using ``build_request(**original_args).url``); + the decorator wrappers in ``filters.chunked`` and + ``chunking.multi_value_chunked`` do this. + + Mutates the first response in place: ``.headers`` is replaced with + the last response's headers and ``.elapsed`` is accumulated across + all chunks. Downstream reads ``.url``, ``.headers``, and + ``.elapsed`` (via ``BaseMetadata``). """ head = responses[0] if len(responses) > 1: + head.headers = responses[-1].headers head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed) return head @@ -295,8 +317,12 @@ def chunked(*, build_request: Callable[..., Any]) -> Callable[[_FetchOnce], _Fet - Chunkable cql-text filter: run the lexicographic-pitfall guard, split into URL-length-safe sub-expressions, call the wrapped function once per chunk, concatenate frames (drop empties, dedup by feature ``id``), - and return an aggregated response (first chunk's URL/headers, summed - ``elapsed``). + and return an aggregated response with ``.url`` restored to the + canonical full-filter URL (so ``BaseMetadata.url`` reflects the + user's original query rather than the first filter chunk), last + chunk's headers (so callers see current ``x-ratelimit-remaining``, + which the outer ``multi_value_chunked`` decorator's ``QuotaExhausted`` + guard depends on), and summed ``elapsed``. Either way the return shape matches the undecorated function's, so the caller wraps the response in ``BaseMetadata`` the same way in both paths. @@ -327,7 +353,12 @@ def wrapper( frames.append(frame) responses.append(response) - return _combine_chunk_frames(frames), _combine_chunk_responses(responses) + # Restore the canonical URL representing the user's full filter + # (the aggregated response otherwise carries only the first + # filter-chunk's URL, which misleads callers logging md.url). + combined = _combine_chunk_responses(responses) + combined.url = build_request(**args).url + return _combine_chunk_frames(frames), combined return wrapper # type: ignore[return-value] diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 9245bb92..fe419100 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -5,7 +5,7 @@ import os import re from collections.abc import Iterable, Mapping -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, get_args from zoneinfo import ZoneInfo @@ -14,7 +14,7 @@ from dataretrieval import __version__ from dataretrieval.utils import BaseMetadata -from dataretrieval.waterdata import filters +from dataretrieval.waterdata import chunking, filters from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -644,6 +644,26 @@ def _get_resp_data(resp: requests.Response, geopd: bool) -> pd.DataFrame: return df +def _finalize_paginated_response( + initial: requests.Response, + last: requests.Response, + total_elapsed: timedelta, +) -> None: + """Carry the last page's headers + cumulative elapsed onto the initial + response in place. + + The initial response stays canonical for ``md.url`` (user's original + query), but its ``.headers`` and ``.elapsed`` are overwritten so the + multi-value chunker's ``QuotaExhausted`` guard sees current + ``x-ratelimit-remaining`` and ``md.query_time`` reflects total + wall-clock across pages. No-op when ``initial is last`` (single page). + """ + if last is initial: + return + initial.headers = last.headers + initial.elapsed = total_elapsed + + def _walk_pages( geopd: bool, req: requests.PreparedRequest, @@ -669,7 +689,11 @@ def _walk_pages( pd.DataFrame A DataFrame containing the aggregated results from all pages. requests.Response - The initial response object containing metadata about the first request. + Aggregated response: the initial request's URL (for query + identity), the final page's headers (so downstream callers see + current rate-limit state, which the multi-value chunker's + ``QuotaExhausted`` guard relies on), and cumulative ``elapsed`` + summed across every page. Raises ------ @@ -700,9 +724,11 @@ def _walk_pages( try: resp = client.send(req) _raise_for_non_200(resp) - - # Store the initial response for metadata + # Keep the original-request response as the "canonical" one for + # ``md.url`` reproducibility; ``.headers`` and ``.elapsed`` get + # overwritten with latest/cumulative values below. initial_response = resp + total_elapsed = resp.elapsed # Grab some aspects of the original request: headers and the # request type (GET or POST) @@ -723,6 +749,7 @@ def _walk_pages( ) _raise_for_non_200(resp) dfs.append(_get_resp_data(resp, geopd=geopd)) + total_elapsed += resp.elapsed curr_url = _next_req_url(resp) except Exception as e: # noqa: BLE001 logger.warning( @@ -730,6 +757,8 @@ def _walk_pages( ) raise RuntimeError(_paginated_failure_message(len(dfs), e)) from e + _finalize_paginated_response(initial_response, resp, total_elapsed) + # Concatenate all pages at once for efficiency return pd.concat(dfs, ignore_index=True), initial_response finally: @@ -957,17 +986,20 @@ def get_ogc_data( return return_list, BaseMetadata(response) +@chunking.multi_value_chunked(build_request=_construct_api_requests) @filters.chunked(build_request=_construct_api_requests) def _fetch_once( args: dict[str, Any], ) -> tuple[pd.DataFrame, requests.Response]: """Send one prepared-args OGC request; return the frame + response. - Filter chunking is added orthogonally by the ``@filters.chunked`` - decorator: with no filter (or an un-chunkable one) the decorator - passes ``args`` through to this body; with a chunkable filter it - fans out and calls this body once per sub-filter, then combines. - Either way the return shape is ``(frame, response)``. + Two orthogonal chunkers wrap this body. ``@chunking.multi_value_chunked`` + (outer) splits multi-value list params (e.g. ``monitoring_location_id``) + across sub-requests so each URL fits the server byte limit; the + cartesian product of per-dim chunks is iterated. ``@filters.chunked`` + (inner) splits long cql-text filters at top-level ``OR``. With no + chunkable inputs both pass through unchanged. Either way the return + shape is ``(frame, response)``. """ req = _construct_api_requests(**args) return _walk_pages(geopd=GEOPANDAS, req=req) @@ -1158,9 +1190,11 @@ def get_stats_data( try: resp = client.send(req) _raise_for_non_200(resp) - - # Store the initial response for metadata + # Keep the original-request response as the "canonical" one for + # ``md.url`` reproducibility; ``.headers`` and ``.elapsed`` get + # overwritten with latest/cumulative values below. initial_response = resp + total_elapsed = resp.elapsed # Grab some aspects of the original request: headers and the # request type (GET or POST) @@ -1186,6 +1220,7 @@ def get_stats_data( _raise_for_non_200(resp) body = resp.json() all_dfs.append(_handle_stats_nesting(body, geopd=GEOPANDAS)) + total_elapsed += resp.elapsed next_token = body["next"] except Exception as e: # noqa: BLE001 logger.warning( @@ -1196,6 +1231,8 @@ def get_stats_data( ) raise RuntimeError(_paginated_failure_message(len(all_dfs), e)) from e + _finalize_paginated_response(initial_response, resp, total_elapsed) + dfs = pd.concat(all_dfs, ignore_index=True) if len(all_dfs) > 1 else all_dfs[0] # . If expand percentiles is True, make each percentile diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 18e78594..30637a5e 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -1,7 +1,9 @@ import datetime import json +import math import sys from unittest import mock +from urllib.parse import quote_plus import pandas as pd import pytest @@ -10,6 +12,8 @@ if sys.version_info < (3, 10): pytest.skip("Skip entire module on Python < 3.10", allow_module_level=True) +from dataretrieval.waterdata import chunking as _chunking +from dataretrieval.waterdata import filters as _filters from dataretrieval.waterdata import ( get_channel, get_combined_metadata, @@ -28,6 +32,18 @@ get_stats_por, get_time_series_metadata, ) +from dataretrieval.waterdata.chunking import ( + _DEFAULT_MAX_CHUNKS, + _DEFAULT_QUOTA_SAFETY_FLOOR, + _QUOTA_HEADER, + QuotaExhausted, + RequestTooLarge, + _chunkable_params, + _filter_aware_probe_args, + _plan_chunks, + _read_remaining, + multi_value_chunked, +) from dataretrieval.waterdata.utils import ( _check_monitoring_location_id, _check_profiles, @@ -207,6 +223,551 @@ def test_construct_api_requests_two_element_date_list_becomes_interval(): assert "time=2024-01-01%2F2024-01-31" in req.url +# ----- Multi-value GET-parameter chunker (chunking.py) ---------------------- +# +# These tests exercise the planner with a fake ``build_request`` whose URL +# byte length is a deterministic function of its inputs. Tests below model: +# - non-chunkable args contribute ``base_bytes``, +# - every multi-value list contributes ``len(",".join(map(str, v)))``, +# - the ``filter`` kwarg contributes ``len(filter)``. +# This isolates planner behaviour from the real HTTP request builder. + + +class _FakeReq: + __slots__ = ("url", "body") + + def __init__(self, url, body=None): + self.url = url + self.body = body + + +def _fake_build(*, base=200, **kwargs): + """Fake build_request: URL length deterministic in its inputs. + + Mirrors the GET-routed shape: payload goes in the URL, body is None. + List/string values are URL-encoded via ``quote_plus`` so the fake's + byte count matches what the real ``_construct_api_requests`` would + produce; otherwise an alphanumeric test could pass against the fake + but fail in production once values containing ``%``, ``+``, ``/``, + ``&`` etc. (which expand under encoding) reach the same code path. + """ + bytes_ = base + for v in kwargs.values(): + if isinstance(v, (list, tuple)): + bytes_ += len(quote_plus(",".join(map(str, v)))) + elif isinstance(v, str): + bytes_ += len(quote_plus(v)) + return _FakeReq("x" * bytes_) + + +def test_filter_aware_probe_args_passes_through_when_not_chunkable(): + """No filter, json-lang filter, single-clause filter — return unchanged.""" + assert _filter_aware_probe_args({"a": 1}) == {"a": 1} + assert _filter_aware_probe_args({"filter": "a='1'", "filter_lang": "cql-json"}) == { + "filter": "a='1'", + "filter_lang": "cql-json", + } + args = {"filter": "a='single clause with no OR'"} + assert _filter_aware_probe_args(args) == args + + +def test_filter_aware_probe_args_models_inner_chunker_bail_floor(): + """Chunkable filter → return args with filter replaced by a synthetic + ASCII clause whose URL byte count equals the inner chunker's bail + floor ``len(longest) * max(per_clause_encoding_ratio)``. Mirrors + ``filters._effective_filter_budget``'s worst-case model so the + planner doesn't approve plans the inner chunker would refuse.""" + args = {"filter": "a='1' OR a='22' OR a='333'", "x": 7} + probe = _filter_aware_probe_args(args) + parts = ["a='1'", "a='22'", "a='333'"] + expected = math.ceil( + max(len(p) for p in parts) * max(len(quote_plus(p)) / len(p) for p in parts) + ) + assert len(probe["filter"]) == expected + assert probe["filter"].isascii() and probe["filter"].isalnum() + assert probe["x"] == 7 + assert args["filter"] == "a='1' OR a='22' OR a='333'" # input not mutated + + +def test_plan_chunks_returns_none_when_request_fits(): + """URL under limit → planner returns None, decorator passes through.""" + args = {"monitoring_location_id": ["A", "B", "C"]} + plan = _plan_chunks(args, _fake_build, url_limit=8000) + assert plan is None + + +def test_plan_chunks_returns_none_when_no_chunkable_lists(): + """No multi-value lists, however over-limit → planner can't help, returns None + (decorator falls through; server may 414 but that's not chunker's job).""" + args = {"monitoring_location_id": "scalar-only"} + plan = _plan_chunks(args, _fake_build, url_limit=10) + assert plan is None + + +def test_plan_chunks_greedy_halving_targets_largest_dim(): + """Two dims with one much larger — the heavy dim halves first.""" + args = { + "monitoring_location_id": ["X" * 30, "Y" * 30, "Z" * 30, "W" * 30], + "parameter_code": ["00060", "00065"], + } + # full URL ≈ 200 + 123 + 12 = 335; force splitting heavy dim only. + plan = _plan_chunks(args, _fake_build, url_limit=310) + assert len(plan["monitoring_location_id"]) > 1 + assert len(plan["parameter_code"]) == 1 # heavy-dim split was enough + + +def test_plan_chunks_raises_request_too_large_at_singleton_floor(): + """Limit below singleton-per-dim floor (with no chunkable filter to + fall back on) → RequestTooLarge with a clear message.""" + args = {"monitoring_location_id": ["A", "B"]} + # base=200 alone exceeds limit; no relief possible. + with pytest.raises(RequestTooLarge, match="multi-value parameter"): + _plan_chunks(args, _fake_build, url_limit=100) + + +def test_plan_chunks_coordinates_with_filter_chunker(monkeypatch): + """COORDINATION REGRESSION TEST. + + With the FULL filter in URL-length probes, singleton-per-dim URL still + exceeds the limit and the planner would raise RequestTooLarge. With + filter-aware probing, the planner models the per-sub-request URL as + ``worst-dim-chunk + longest-clause-after-encoding`` (the inner filter + chunker's bail floor — it returns the FULL filter if any single + clause exceeds the budget, so the longest clause is the smallest + floor it can guarantee). The probe fits, plan returns. + + Sanity-check the *negative*: with filter-aware probing disabled, the + same inputs would raise. + """ + clauses = [f"f='{i}'" for i in range(10)] + args = { + "monitoring_location_id": ["A" * 10, "B" * 10, "C" * 10, "D" * 10], + "filter": " OR ".join(clauses), + } + # singleton+full-filter ≈ 200 + 10 + 86 = 296 (over limit 240) — would raise. + # longest-clause probe model ≈ 200 + 10 + 5 = 215 (under limit) — plan succeeds. + # (Here all clauses are the same length, so longest == shortest; the + # encoding-ratio coordination matters for lopsided clauses.) + plan = _plan_chunks(args, _fake_build, url_limit=240) + assert plan is not None # coordination prevented the premature raise + assert len(plan["monitoring_location_id"]) > 1 # planner did split + + # Negative control: patch the probe helper to be a no-op (model "no + # filter awareness") and confirm the same inputs raise. + monkeypatch.setattr(_chunking, "_filter_aware_probe_args", lambda a: a) + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=240) + + +def test_plan_chunks_probes_with_max_clause_not_min(): + """Regression: with lopsided OR-clauses (one short, one long), probing + with min(parts) lets the planner falsely declare a plan feasible that + the inner filter chunker can't actually deliver — it bails when any + single clause exceeds the per-sub-request budget. Probing with the + longest clause is the safe lower bound on per-sub-request filter + size, so the planner correctly raises when no plan can fit.""" + args = { + "sites": ["A" * 10, "B" * 10], + "filter": "x='1' OR x='" + "a" * 28 + "'", # 5-char and 33-char clauses + } + # base 200 + singleton sites 10 + min-clause 5 = 215 (limit 230 -> fits) + # base 200 + singleton sites 10 + max-clause 33 = 243 (limit 230 -> exceeds) + # With min: planner succeeds, but real URL with full filter (42) exceeds + # 230 -> server 414. With max: planner raises early, as it should. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=230) + + +def test_plan_chunks_still_raises_when_even_min_clause_doesnt_fit(): + """If the limit is so tight that singleton + shortest-clause STILL + exceeds it, filter chunker can't save us either — raise.""" + args = { + "monitoring_location_id": ["A" * 10, "B" * 10], + "filter": "x='12345' OR x='67890'", # min clause is 9 chars + } + # Singleton + min-clause ≈ 200 + 10 + 9 = 219; limit below that → unrecoverable. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=210) + + +def test_multi_value_chunked_passes_through_when_url_fits(): + """No planning needed → decorator calls underlying function exactly once + with the original args.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch({"monitoring_location_id": ["A", "B"]}) + assert len(calls) == 1 + assert calls[0]["monitoring_location_id"] == ["A", "B"] + + +def test_multi_value_chunked_emits_cartesian_product(): + """Two chunkable dims, each split into 2 chunks → exactly 4 sub-calls, + each pairing one chunk from each dim.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append({k: v for k, v in args.items() if k in ("sites", "pcodes")}) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10], + "pcodes": ["P1" * 10, "P2" * 10, "P3" * 10, "P4" * 10], + } + ) + # Both heavy → planner should split both dims. Confirm a cartesian shape: + # every unique site-chunk pairs with every unique pcode-chunk. + sites_seen = {tuple(c["sites"]) for c in calls} + pcodes_seen = {tuple(c["pcodes"]) for c in calls} + assert len(calls) == len(sites_seen) * len(pcodes_seen) + assert len(sites_seen) > 1 + assert len(pcodes_seen) > 1 + + +def test_multi_value_chunked_emits_3d_cartesian_product(): + """Three chunkable dims, each forced to split → exhaustive cartesian + product across all three. Verifies the planner's halving loop handles + N>2 dims uniformly and the wrapper's ``itertools.product`` enumerates + every combination exactly once.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append(tuple(tuple(args[k]) for k in ("sites", "pcodes", "stats"))) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S" * 12 + str(i) for i in range(4)], + "pcodes": ["P" * 12 + str(i) for i in range(4)], + "stats": ["T" * 12 + str(i) for i in range(4)], + } + ) + + # Three independent axes — every (site_chunk, pcode_chunk, stat_chunk) + # triple must appear exactly once. Confirm: + sites_seen = {c[0] for c in calls} + pcodes_seen = {c[1] for c in calls} + stats_seen = {c[2] for c in calls} + + assert len(sites_seen) > 1, "sites dim was not split" + assert len(pcodes_seen) > 1, "pcodes dim was not split" + assert len(stats_seen) > 1, "stats dim was not split" + + # Cartesian shape: # sub-calls == product of unique chunks across dims + expected = len(sites_seen) * len(pcodes_seen) * len(stats_seen) + assert len(calls) == expected, ( + f"expected {expected} cartesian-product sub-calls, got {len(calls)}" + ) + # And no triple repeats (exhaustive enumeration, no duplicates). + assert len(set(calls)) == len(calls) + # The chunked values, when unioned across calls, recover the original list. + assert {x for tup in sites_seen for x in tup} == { + "S" * 12 + str(i) for i in range(4) + } + assert {x for tup in pcodes_seen for x in tup} == { + "P" * 12 + str(i) for i in range(4) + } + assert {x for tup in stats_seen for x in tup} == { + "T" * 12 + str(i) for i in range(4) + } + + +def test_multi_value_chunked_lazy_url_limit(monkeypatch): + """``url_limit=None`` → resolve filters._WATERDATA_URL_BYTE_LIMIT at call + time, so tests that patch the constant affect this decorator too.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build) # url_limit defaults to None + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + monkeypatch.setattr(_filters, "_WATERDATA_URL_BYTE_LIMIT", 240) + # 4 sites of 10 chars → exceeds 240 → planner splits. + fetch({"sites": ["S" * 10 + str(i) for i in range(4)]}) + assert len(calls) > 1, "patched constant should drive chunking" + + +def test_default_max_chunks_matches_hourly_api_quota(): + """The default cap mirrors the USGS Water Data API's documented + per-API-key hourly limit. Locking this in so future changes have to + explicitly acknowledge the quota.""" + assert _DEFAULT_MAX_CHUNKS == 1000 + + +def test_plan_chunks_raises_when_plan_exceeds_max_chunks(): + """A converged plan with more sub-requests than ``max_chunks`` must + raise rather than silently issue them and burn the user's API quota.""" + # 2 dims with long values, each needing many singleton-ish chunks. + # Pick chunk sizes that converge to a plan exceeding a tight cap. + args = { + "dim_a": [f"long-string-value-{i}" for i in range(50)], + "dim_b": [f"another-long-value-{i}" for i in range(50)], + } + # url_limit forces splitting; max_chunks=10 forces the cap to fire. + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + _plan_chunks(args, _fake_build, url_limit=250, max_chunks=10) + + +def test_plan_chunks_respects_default_cap_without_explicit_arg(): + """Default kwarg path: ``max_chunks`` defaults to _DEFAULT_MAX_CHUNKS + when not specified, so direct callers (e.g., other library code) get + the same safety net as the decorator wrapper.""" + args = { + "dim_a": [f"v{i:03d}" for i in range(60)], + "dim_b": [f"v{i:03d}" for i in range(60)], + "dim_c": [f"v{i:03d}" for i in range(60)], + } + # Without explicit max_chunks: defaults to 1000. The plan for these + # inputs would emit > 1000 sub-requests at a tight limit, so should + # raise on default cap alone. + with pytest.raises(RequestTooLarge, match=r"max_chunks=1000"): + _plan_chunks(args, _fake_build, url_limit=220) + + +def test_multi_value_chunked_cap_override(): + """A decorator-time ``max_chunks`` override lets callers with higher + quotas raise the ceiling without monkeypatching the module constant.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=220, max_chunks=10) + def fetch(args): + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + fetch( + { + "dim_a": [f"longer-v{i}" for i in range(30)], + "dim_b": [f"longer-v{i}" for i in range(30)], + } + ) + + +def _quota_response(remaining: int | str | None) -> mock.Mock: + """A mock requests.Response-like object whose ``x-ratelimit-remaining`` + header reflects the given value (None → header absent).""" + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = {} if remaining is None else {_QUOTA_HEADER: str(remaining)} + return resp + + +def test_read_remaining_parses_header(): + assert _read_remaining(_quota_response(42)) == 42 + + +def test_read_remaining_treats_missing_header_as_plenty(): + """Servers that don't echo a rate-limit header must not trigger + spurious QuotaExhausted aborts. Sentinel is a large integer so any + plausible safety floor compares cleanly.""" + assert _read_remaining(_quota_response(None)) >= 1_000_000 + + +def test_read_remaining_treats_malformed_header_as_plenty(): + """Defensive: non-integer header value → don't abort.""" + assert _read_remaining(_quota_response("not-a-number")) >= 1_000_000 + + +def test_default_quota_safety_floor(): + """Default floor lives at 50 — enough headroom for one final + chunked call's pagination spike without breaching the hourly cap.""" + assert _DEFAULT_QUOTA_SAFETY_FLOOR == 50 + + +def test_multi_value_chunked_aborts_when_quota_floor_breached(): + """Mid-call, when ``x-ratelimit-remaining`` drops below the floor, + the chunker must raise ``QuotaExhausted`` *before* issuing the next + sub-request — and the exception must carry the partial frame plus + the chunk offset so callers can resume.""" + # Build a fetch_once whose response 'remaining' header decrements + # through 200, 100, 40 (below floor=50), 10. + remaining_seq = iter([200, 100, 40, 10]) + page_idx = iter(range(10)) + + def fetch(args): + idx = next(page_idx) + return ( + pd.DataFrame( + {"site": list(args["sites"]), "page": [idx] * len(args["sites"])} + ), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + # Plan forces 4 sub-requests (4 singleton site chunks). + with pytest.raises(QuotaExhausted) as excinfo: + decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + # Aborted after the 3rd sub-request (remaining=40 < floor=50). + assert err.completed_chunks == 3 + assert err.total_chunks == 4 + assert err.remaining == 40 + # Partial frame combines rows from the first three completed sub-requests. + assert err.partial_frame is not None + assert set(err.partial_frame["page"]) == {0, 1, 2} + + +def test_multi_value_chunked_does_not_abort_on_last_chunk(): + """Aborting on the final sub-request would be pointless — there's + no 'next' to protect. The check is skipped there. Earlier chunks + stay above the floor; only the last drops below, and we still + return cleanly because the check is skipped at i == total-1.""" + remaining_seq = iter([500, 5]) # only the LAST chunk dips below floor=50 + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) # forces 2 chunks + assert len(df) == 2 # no raise — both chunks ran + + +def test_multi_value_chunked_quota_check_disabled_with_zero_floor(): + """Setting the floor to 0 effectively disables the quota guard — + counter can go to 1 without aborting (since 1 > 0 = floor).""" + remaining_seq = iter([5, 1]) + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=0, + )(fetch) + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) + assert len(df) == 2 # no raise + + +def test_quota_exhausted_message_includes_resume_offset(): + """The error message must point the user at the chunk offset to + resume from, otherwise the partial_frame attribute is a footgun + — the user has no way to know which chunks still need re-issuing.""" + e = QuotaExhausted( + partial_frame=pd.DataFrame(), + partial_response=mock.Mock(), + completed_chunks=7, + total_chunks=20, + remaining=12, + ) + msg = str(e) + assert "7/20" in msg + assert "12" in msg + assert "QuotaExhausted" in msg or "resume" in msg + + +def test_request_bytes_rejects_non_sizable_body(): + """``_request_bytes`` requires a deterministic byte count up front; + silently treating an unknown body as zero would under-chunk and let + the request blow past the server's POST-body limit. Generators, + iterables, and file-like objects must surface as ``TypeError``.""" + from dataretrieval.waterdata.chunking import _request_bytes + + class _FakeReqWithGenBody: + url = "https://example.com/foo" + body = (b"x" for _ in range(3)) + + with pytest.raises(TypeError, match="cannot size a request body"): + _request_bytes(_FakeReqWithGenBody()) + + +def test_request_bytes_handles_supported_body_types(): + """Sanity-check the supported body types: None (GET), bytes (raw + POST), str (JSON-as-string POST).""" + from dataretrieval.waterdata.chunking import _request_bytes + + class _Req: + def __init__(self, url, body): + self.url = url + self.body = body + + assert _request_bytes(_Req("ab", None)) == 2 + assert _request_bytes(_Req("ab", b"cd")) == 4 + assert _request_bytes(_Req("ab", "cd")) == 4 + assert _request_bytes(_Req("ab", bytearray(b"cd"))) == 4 + + +def test_plan_chunks_rejects_non_positive_max_chunks(): + """``max_chunks < 1`` is meaningless and would silently bypass the + cap on the no-chunking-needed path (initial plan total = 1 and the + in-loop check only runs after a split). Reject early.""" + args = {"monitoring_location_id": ["A", "B", "C", "D"]} + with pytest.raises(ValueError, match="max_chunks must be >= 1"): + _plan_chunks(args, _fake_build, url_limit=1000, max_chunks=0) + with pytest.raises(ValueError, match="max_chunks must be >= 1"): + _plan_chunks(args, _fake_build, url_limit=1000, max_chunks=-5) + + +def test_multi_value_chunked_restores_canonical_url(): + """When chunking fans out, the aggregated response's ``.url`` must + reflect the *user's original* query (rebuilt from the unchunked + args), not the first chunk's URL. Callers logging ``md.url`` for + reproducibility need the full query.""" + sites = ["S" * 10 + str(i) for i in range(4)] + sub_urls: list[str] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + # Each sub-response carries the chunked sub_args's URL, so + # without canonical restoration the first chunk's URL would + # leak through to md.url. + sub_url = _fake_build(**args).url + sub_urls.append(sub_url) + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = {} + resp.url = sub_url + return pd.DataFrame(), resp + + _df, md = fetch({"sites": sites}) + + assert len(sub_urls) > 1, "test setup error: chunker didn't fan out" + # md.url must equal the URL the unchunked query would have produced. + assert md.url == _fake_build(sites=sites).url + # And differ from every sub-request's URL (each carries a smaller list). + assert all(md.url != u for u in sub_urls) + # The canonical URL is strictly bigger byte-wise than any sub-request. + assert all(len(md.url) > len(u) for u in sub_urls) + + +def test_chunkable_params_skips_filter_passed_as_list(): + """Defensive guard: ``filter`` is documented as a string. If a caller + mistakenly passes it as a list, the chunker must NOT treat it as a + multi-value dim — comma-joining CQL clauses inside the URL would + produce a malformed filter expression. The inner ``filters.chunked`` + is the only place that may shrink ``filter``.""" + args = { + "monitoring_location_id": ["USGS-A", "USGS-B"], + "filter": ["a='1'", "a='2'"], # malformed input + "filter_lang": ["cql-text", "cql-json"], # ditto + } + chunkable = _chunkable_params(args) + assert "monitoring_location_id" in chunkable + assert "filter" not in chunkable + assert "filter_lang" not in chunkable + + def test_samples_results(): """Test results call for proper columns""" df, _ = get_samples( From baae50a03be3719af83cd656a844582319de4925 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sun, 17 May 2026 17:05:42 -0500 Subject: [PATCH 2/6] feat(waterdata): Add resume_from + ChunkManifest for partial chunked queries When a chunked OGC call fails partway through, the chunker now raises PartialResult (or its QuotaExhausted subclass) carrying the combined partial DataFrame plus a ChunkManifest recording how many sub-requests of the cartesian-product plan completed. The same getter accepts the partial metadata back via a new resume_from= kwarg; the chunker validates the saved plan matches the fresh args and fetches only the remaining combinations. Callers concatenate the saved partial frame with the resume call's return value to reconstruct the full result. - ChunkManifest dataclass (frozen, hashable) with plan / completed / total / is_complete / remaining - PartialResult base exception with partial_frame, partial_response, manifest, and a lazy partial_metadata property; QuotaExhausted now subclasses it and carries the manifest instead of bare counts - BaseMetadata.chunk_manifest exposes the manifest end-to-end - multi_value_chunked wrapper validates resume_from, skips completed cartesian-product combinations via itertools.islice, wraps any sub-request exception as PartialResult with __cause__ preserved - resume_from= added to all 11 chunked-getter signatures - 14 new tests covering manifest properties, resume validation (mismatched plan / no manifest / already-complete / no chunking), PartialResult wrapping of fetch errors, end-to-end quota-exhaust- then-resume, and partial_metadata wrapping --- NEWS.md | 2 + dataretrieval/utils.py | 5 + dataretrieval/waterdata/api.py | 77 ++++++ dataretrieval/waterdata/chunking.py | 322 +++++++++++++++++++---- tests/waterdata_test.py | 392 +++++++++++++++++++++++++++- 5 files changed, 749 insertions(+), 49 deletions(-) diff --git a/NEWS.md b/NEWS.md index 3d9a5814..ce591cd6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**05/17/2026:** Chunked `waterdata` calls that fail partway through are now resumable. Any sub-request failure (quota exhaustion, mid-pagination 5xx/429, transport error) raises `PartialResult` (or its `QuotaExhausted` subclass) carrying the combined partial DataFrame, a `BaseMetadata.partial_metadata` accessor, and a `ChunkManifest` recording how many sub-requests of the cartesian-product plan completed. The same getter accepts the partial metadata via a new `resume_from=` kwarg; the chunker validates the saved plan matches the fresh args and fetches only the remaining cartesian-product combinations. Callers concatenate their saved partial DataFrame with the resume call's return value to reconstruct the full result. The manifest is also attached to `BaseMetadata.chunk_manifest` on successful chunked calls for observability. + **05/17/2026:** The OGC `waterdata` getters (`get_daily`, `get_continuous`, `get_field_measurements`, and the rest of the multi-value-capable functions) now transparently chunk requests whose URLs would otherwise exceed the server's ~8 KB byte limit. A common chained-query pattern — pull a long site list from `get_monitoring_locations`, then feed it into `get_daily` — previously failed with HTTP 414 once the resulting URL grew past the limit; it now fans out across multiple sub-requests under the hood and returns one combined DataFrame. The chunker coordinates with the existing CQL `filter` chunker (long top-level-`OR` filters still split correctly when used alongside long multi-value lists), caps cartesian-product plans at 1000 sub-requests (the default USGS hourly quota), and aborts mid-call with a structured `QuotaExhausted` exception — carrying the partial result and a resume offset — if `x-ratelimit-remaining` drops below a safety floor. Mirrors R `dataRetrieval`'s [#870](https://github.com/DOI-USGS/dataRetrieval/pull/870), generalized to N dimensions. Note one metadata-behavior change for paginated/chunked calls: `BaseMetadata.url` still reflects the user's original query (unchanged), but `BaseMetadata.header` now carries the *last* page's / sub-request's headers (so `x-ratelimit-remaining` is current) rather than the first, and `BaseMetadata.query_time` is now the cumulative wall-clock across pages instead of the first page's elapsed. **05/16/2026:** Fixed silent truncation in the paginated `waterdata` request loops (`_walk_pages` and `get_stats_data`). Mid-pagination failures (HTTP 429, 5xx, network error) were previously swallowed — pagination would quietly stop and the function would return whatever rows it had collected, leaving callers with truncated DataFrames they had no way to detect. The loops now status-check every page like the initial request and raise `RuntimeError` on any failure, with the upstream exception chained as `__cause__` and a short menu of recovery actions (wait and retry, reduce the request, or obtain an API token) in the message. **Behavior change**: callers that previously consumed partial DataFrames on transient upstream blips will now see an exception; retry the call (possibly with a smaller `limit` or narrower query). diff --git a/dataretrieval/utils.py b/dataretrieval/utils.py index 76bbb6ad..6ec24935 100644 --- a/dataretrieval/utils.py +++ b/dataretrieval/utils.py @@ -230,6 +230,11 @@ def __init__(self, response) -> None: self.query_time = response.elapsed self.header = response.headers self.comment = None + # Set by ``waterdata.chunking.multi_value_chunked`` when a request + # was split into sub-requests. ``None`` for non-chunked calls. See + # ``ChunkManifest`` for how callers use this to resume a partial + # query. + self.chunk_manifest = getattr(response, "chunk_manifest", None) # # not sure what statistic_info is # self.statistic_info = None diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 025aafcd..bae04ecf 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -57,6 +57,7 @@ def get_daily( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the day. @@ -189,6 +190,12 @@ def get_daily( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -272,6 +279,7 @@ def get_continuous( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """ Continuous data provide instantanous water conditions. @@ -399,6 +407,12 @@ def get_continuous( convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -492,6 +506,7 @@ def get_monitoring_locations( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Location information is basic information about the monitoring location including the name, identifier, agency responsible for data collection, and @@ -707,6 +722,12 @@ def get_monitoring_locations( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -770,6 +791,7 @@ def get_time_series_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data and continuous measurements are grouped into time series, which represent a collection of observations of a single parameter, @@ -930,6 +952,12 @@ def get_time_series_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -1027,6 +1055,7 @@ def get_combined_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get combined monitoring-location and time-series metadata. @@ -1127,6 +1156,12 @@ def get_combined_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -1215,6 +1250,7 @@ def get_latest_continuous( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """This endpoint provides the most recent observation for each time series of continuous data. Continuous data are collected via automated sensors @@ -1344,6 +1380,12 @@ def get_latest_continuous( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -1410,6 +1452,7 @@ def get_latest_daily( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the day. @@ -1541,6 +1584,12 @@ def get_latest_daily( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -1608,6 +1657,7 @@ def get_field_measurements( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Field measurements are physically measured values collected during a visit to the monitoring location. Field measurements consist of measurements @@ -1729,6 +1779,12 @@ def get_field_measurements( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -1792,6 +1848,7 @@ def get_field_measurements_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get field-measurement metadata: one row per (location, parameter) series. @@ -1847,6 +1904,12 @@ def get_field_measurements_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -1913,6 +1976,7 @@ def get_peaks( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get the annual peak streamflow / stage record for a monitoring location. @@ -1971,6 +2035,12 @@ def get_peaks( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- @@ -2710,6 +2780,7 @@ def get_channel( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """ Channel measurements taken as part of streamflow field measurements. @@ -2823,6 +2894,12 @@ def get_channel( convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. Returns ------- diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index bbdf5b12..142ee0a8 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -35,10 +35,12 @@ from __future__ import annotations +import datetime import functools import itertools import math from collections.abc import Callable +from dataclasses import dataclass from typing import Any from urllib.parse import quote_plus @@ -113,39 +115,98 @@ class RequestTooLarge(ValueError): would issue more than ``max_chunks`` sub-requests.""" -class QuotaExhausted(RuntimeError): - """Raised mid-chunked-call when the API's reported remaining quota - (``x-ratelimit-remaining`` header) drops below the configured safety - floor. The chunker stops before issuing the next sub-request to - avoid a mid-call HTTP 429 that would silently truncate paginated - results (see PR #273 for the pagination side of that bug). +# Normalized plan shape: a tuple of ``(dim_name, tuple-of-chunk-tuples)`` +# pairs. Used for hashing/equality on ``ChunkManifest`` and to compare a +# resumed call's fresh plan against the saved manifest's plan. +_NormalizedPlan = tuple[tuple[str, tuple[tuple[Any, ...], ...]], ...] + + +def _normalize_plan(plan: dict[str, list[list[Any]]]) -> _NormalizedPlan: + """Convert the planner's mutable nested-list plan to an immutable, + comparable nested-tuple form. Preserves insertion order, which is + the cartesian-product iteration order (Python 3.7+ dict guarantee).""" + return tuple( + (key, tuple(tuple(chunk) for chunk in chunks)) for key, chunks in plan.items() + ) + + +@dataclass(frozen=True) +class ChunkManifest: + """Snapshot of a chunked call's progress, sufficient to resume. + + Attached to ``BaseMetadata.chunk_manifest`` on every chunked call + (successful or failed). On a failed call the manifest records how + many sub-requests completed; passing the partial metadata back via + ``resume_from=`` re-runs only the remaining cartesian-product + combinations. Pinning the normalized plan (not just the input + args) lets resume detect when a caller has changed their inputs + between the original call and the retry — same-looking args that + chunk differently would silently re-fetch wrong sub-ranges. + + Attributes + ---------- + plan : tuple + Normalized form of the chunker's cartesian-product plan: a + tuple of ``(dim_name, tuple-of-chunk-tuples)`` pairs, in the + order the planner iterates them. + completed : int + Number of sub-requests that completed before the call + terminated. Equal to ``total`` on a fully successful call. + """ + + plan: _NormalizedPlan + completed: int + + @property + def total(self) -> int: + """Total sub-requests in the cartesian-product plan.""" + return math.prod(len(chunks) for _, chunks in self.plan) + + @property + def is_complete(self) -> bool: + """``True`` when every sub-request in the plan completed.""" + return self.completed >= self.total + + @property + def remaining(self) -> int: + """Number of sub-requests still to fetch on resume.""" + return max(self.total - self.completed, 0) - The exception carries everything needed to resume: the combined - partial frame from completed sub-requests, the metadata for the - last successful sub-request, the number of chunks completed out of - the plan total, and the last-observed ``remaining`` value. + def __repr__(self) -> str: + return ( + f"ChunkManifest(completed={self.completed}/{self.total}, " + f"dims={len(self.plan)})" + ) + + +class PartialResult(RuntimeError): + """Raised mid-chunked-call when any sub-request fails. Carries the + combined partial frame and a ``ChunkManifest`` recording how many + sub-requests completed. Catch this exception to access partial + data, then re-call the original getter with the partial metadata + via ``resume_from=`` to fetch the remaining chunks. + + ``__cause__`` (set via ``raise ... from exc``) holds the underlying + exception — typically a ``RuntimeError`` from ``_walk_pages``'s + mid-pagination failure handler (see PR #279) or a transport-level + ``requests`` exception. Attributes ---------- partial_frame : pd.DataFrame Concatenated, deduplicated result of every sub-request that - completed before the floor was crossed. + completed in this call. Empty if the first attempted + sub-request failed. partial_response : requests.Response - Aggregated response with the canonical URL restored to the - user's full original query (rebuilt from ``args`` rather than - copied from the first sub-request, which only carries the - first chunk's parameters). Headers come from the last - completed sub-request (so callers inspecting - ``x-ratelimit-remaining`` see current quota state), and - ``elapsed`` is summed across completed sub-requests. Wrap in - ``BaseMetadata`` to surface to the caller alongside the - partial frame. - completed_chunks : int - Number of sub-requests successfully completed. - total_chunks : int - Total sub-requests in the cartesian-product plan. - remaining : int - Last observed ``x-ratelimit-remaining`` value. + Aggregated response carrying the canonical URL (the user's + full original query), last successful sub-request's headers, + summed ``elapsed``, and ``chunk_manifest`` attribute set so + ``BaseMetadata(partial_response).chunk_manifest`` exposes the + manifest. + manifest : ChunkManifest + Records the chunk plan and the number of completed + sub-requests. Pass the wrapping metadata back via + ``resume_from=`` to resume. """ def __init__( @@ -153,20 +214,64 @@ def __init__( *, partial_frame: pd.DataFrame, partial_response: requests.Response, - completed_chunks: int, - total_chunks: int, - remaining: int, + manifest: ChunkManifest, + message: str | None = None, ) -> None: super().__init__( - f"x-ratelimit-remaining dropped to {remaining} after " - f"{completed_chunks}/{total_chunks} chunks; aborting to avoid " - f"mid-call HTTP 429. Catch QuotaExhausted to access " - f".partial_frame and resume from chunk {completed_chunks}." + message + or ( + f"Chunked request failed after " + f"{manifest.completed}/{manifest.total} sub-requests. " + f"Catch PartialResult to access .partial_frame and " + f"resume_from=partial_metadata on a retry." + ) ) self.partial_frame = partial_frame self.partial_response = partial_response - self.completed_chunks = completed_chunks - self.total_chunks = total_chunks + self.manifest = manifest + + @property + def partial_metadata(self): + """``BaseMetadata`` wrapping ``partial_response``. Lazy so the + chunker module stays decoupled from ``dataretrieval.utils`` at + import time (avoids a circular-import-shaped surface).""" + from dataretrieval.utils import BaseMetadata + + return BaseMetadata(self.partial_response) + + +class QuotaExhausted(PartialResult): + """Raised mid-chunked-call when the API's reported remaining quota + (``x-ratelimit-remaining`` header) drops below the configured + safety floor. The chunker stops before issuing the next + sub-request to avoid a mid-call HTTP 429 that would silently + truncate paginated results (see PR #273). + + Inherits ``partial_frame``, ``partial_response``, and ``manifest`` + from ``PartialResult``. Adds ``remaining`` (the last observed + header value). + """ + + def __init__( + self, + *, + partial_frame: pd.DataFrame, + partial_response: requests.Response, + manifest: ChunkManifest, + remaining: int, + ) -> None: + super().__init__( + partial_frame=partial_frame, + partial_response=partial_response, + manifest=manifest, + message=( + f"x-ratelimit-remaining dropped to {remaining} after " + f"{manifest.completed}/{manifest.total} chunks; aborting to " + f"avoid mid-call HTTP 429. Catch QuotaExhausted to access " + f".partial_frame and resume_from=partial_metadata after the " + f"hourly window resets." + ), + ) self.remaining = remaining @@ -367,6 +472,91 @@ def _read_remaining(response: requests.Response) -> int: return _QUOTA_UNKNOWN +def _build_partial_failure( + *, + frames: list[pd.DataFrame], + responses: list[requests.Response], + canonical_url: str, + plan_norm: _NormalizedPlan, + completed: int, + sub_index: int, + total: int, + cause: BaseException, +) -> PartialResult: + """Assemble a ``PartialResult`` for a sub-request that errored. + + ``completed`` is the count of sub-requests that finished before the + failure (so a fresh ``ChunkManifest(completed=completed)`` resumes at + the failed one). ``responses`` may be empty (failure on the very + first attempted sub-request, including the first chunk of a resume + call); in that case the synthesized response carries only the + canonical URL and an empty header set, with the manifest still + attached so caller-side ``BaseMetadata.chunk_manifest`` works. + """ + if responses: + partial = _combine_chunk_responses(responses) + else: + partial = requests.Response() + partial.elapsed = datetime.timedelta(0) + partial.url = canonical_url + manifest = ChunkManifest(plan=plan_norm, completed=completed) + partial.chunk_manifest = manifest + partial_frame = _combine_chunk_frames(frames) if frames else pd.DataFrame() + return PartialResult( + partial_frame=partial_frame, + partial_response=partial, + manifest=manifest, + message=( + f"Chunked request failed at sub-request " + f"{sub_index + 1}/{total} ({type(cause).__name__}: {cause}). " + f"Catch PartialResult to access .partial_frame and resume " + f"with resume_from=partial_metadata." + ), + ) + + +def _resolve_resume( + resume_from: Any, plan: dict[str, list[list[Any]]] | None +) -> tuple[int, _NormalizedPlan]: + """Validate a ``resume_from`` metadata against a freshly computed + plan and return ``(start_index, normalized_plan)``. + + A resume call is only valid when the freshly chunked plan matches + the saved manifest exactly. Mismatched plans mean the caller's args + changed between the original call and the retry — silently + re-fetching with the new plan would produce a frame that + interleaves data from two incompatible queries. + """ + manifest = getattr(resume_from, "chunk_manifest", None) + if manifest is None: + raise ValueError( + "resume_from has no chunk_manifest. The original call was " + "not chunked (or the metadata is from a different source), " + "so there's nothing to resume." + ) + if plan is None: + raise ValueError( + "resume_from was provided but the current args do not " + "produce a chunk plan — the request fits in one round-trip. " + "Pass the same kwargs as the original call to resume." + ) + fresh = _normalize_plan(plan) + if fresh != manifest.plan: + raise ValueError( + "resume_from manifest does not match the current chunk plan. " + "The kwargs passed to this call would chunk differently than " + "the original. Pass identical kwargs (minus resume_from) to " + "resume; otherwise drop resume_from to issue a fresh query." + ) + if manifest.is_complete: + raise ValueError( + f"resume_from manifest is already complete " + f"({manifest.completed}/{manifest.total} chunks). There is " + f"nothing to resume; drop resume_from." + ) + return manifest.completed, fresh + + def multi_value_chunked( *, build_request: Callable[..., Any], @@ -391,11 +581,18 @@ def multi_value_chunked( Between sub-requests the wrapper reads ``x-ratelimit-remaining`` from each response. If it drops below ``quota_safety_floor`` (default ``_DEFAULT_QUOTA_SAFETY_FLOOR``), the wrapper raises ``QuotaExhausted`` - carrying the combined partial result and the chunk offset so callers - can resume after the hourly window resets, instead of crashing into - a mid-pagination HTTP 429 (which the upstream pagination loop in + carrying the combined partial result and a ``ChunkManifest`` so + callers can resume after the hourly window resets via + ``resume_from=partial_metadata``, instead of crashing into a + mid-pagination HTTP 429 (which the upstream pagination loop in ``_walk_pages`` historically truncated silently — see PR #273). + Any other failure inside a sub-request (transport errors, mid- + pagination ``RuntimeError`` from PR #279, inner-filter + ``RequestTooLarge``) is re-raised as ``PartialResult`` with the + same partial-state payload, with the underlying exception preserved + via ``__cause__``. + Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop, filter-chunking is the inner loop. The wrapped function has the same signature as ``filters.chunked`` expects — ``(args: dict) -> (frame, @@ -413,9 +610,14 @@ def multi_value_chunked( order (Python 3.7+ guarantee), which equals the order in which chunkable params appeared in ``args``. For the public waterdata getters that order is the function-signature order, so - ``QuotaExhausted.completed_chunks`` maps to the same sub-requests - across repeated calls with the same arguments — resume is - well-defined. + ``ChunkManifest.completed`` maps to the same sub-requests across + repeated calls with the same arguments — resume is well-defined. + + Resume is triggered by passing ``resume_from=partial_metadata`` in + the caller's kwargs. The wrapper pops it before planning (so it + never reaches the underlying HTTP request), validates the saved + plan matches the fresh plan, and skips the already-completed + cartesian-product combinations. """ def decorator(fetch_once: _FetchOnce) -> _FetchOnce: @@ -433,7 +635,20 @@ def wrapper( if quota_safety_floor is not None else _DEFAULT_QUOTA_SAFETY_FLOOR ) + # ``resume_from`` is a chunker-only control kwarg; pull it + # out before any planner/URL probe runs so it can't reach + # the underlying API as a bogus query parameter. + resume_from = args.get("resume_from") + args = {k: v for k, v in args.items() if k != "resume_from"} + plan = _plan_chunks(args, build_request, limit, max_chunks) + + if resume_from is not None: + start_index, plan_norm = _resolve_resume(resume_from, plan) + else: + start_index = 0 + plan_norm = _normalize_plan(plan) if plan is not None else () + if plan is None: return fetch_once(args) @@ -449,9 +664,24 @@ def wrapper( total = _plan_total(plan) frames: list[pd.DataFrame] = [] responses: list[requests.Response] = [] - for i, combo in enumerate(itertools.product(*(plan[k] for k in keys))): + combos = itertools.islice( + itertools.product(*(plan[k] for k in keys)), start_index, None + ) + for i, combo in enumerate(combos, start=start_index): sub_args = {**args, **dict(zip(keys, combo))} - frame, response = fetch_once(sub_args) + try: + frame, response = fetch_once(sub_args) + except Exception as exc: + raise _build_partial_failure( + frames=frames, + responses=responses, + canonical_url=canonical_url, + plan_norm=plan_norm, + completed=i, + sub_index=i, + total=total, + cause=exc, + ) from exc frames.append(frame) responses.append(response) # Skip the quota check after the last sub-request — @@ -461,16 +691,18 @@ def wrapper( if remaining < floor: partial = _combine_chunk_responses(responses) partial.url = canonical_url + manifest = ChunkManifest(plan=plan_norm, completed=i + 1) + partial.chunk_manifest = manifest raise QuotaExhausted( partial_frame=_combine_chunk_frames(frames), partial_response=partial, - completed_chunks=i + 1, - total_chunks=total, + manifest=manifest, remaining=remaining, ) combined = _combine_chunk_responses(responses) combined.url = canonical_url + combined.chunk_manifest = ChunkManifest(plan=plan_norm, completed=total) return _combine_chunk_frames(frames), combined return wrapper # type: ignore[return-value] diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 30637a5e..ee1b97ec 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -36,10 +36,13 @@ _DEFAULT_MAX_CHUNKS, _DEFAULT_QUOTA_SAFETY_FLOOR, _QUOTA_HEADER, + ChunkManifest, + PartialResult, QuotaExhausted, RequestTooLarge, _chunkable_params, _filter_aware_probe_args, + _normalize_plan, _plan_chunks, _read_remaining, multi_value_chunked, @@ -610,12 +613,16 @@ def fetch(args): err = excinfo.value # Aborted after the 3rd sub-request (remaining=40 < floor=50). - assert err.completed_chunks == 3 - assert err.total_chunks == 4 + assert err.manifest.completed == 3 + assert err.manifest.total == 4 assert err.remaining == 40 # Partial frame combines rows from the first three completed sub-requests. assert err.partial_frame is not None assert set(err.partial_frame["page"]) == {0, 1, 2} + # The partial_response carries the manifest so callers can resume. + assert err.partial_response.chunk_manifest is err.manifest + assert err.manifest.is_complete is False + assert err.manifest.remaining == 1 def test_multi_value_chunked_does_not_abort_on_last_chunk(): @@ -665,11 +672,14 @@ def test_quota_exhausted_message_includes_resume_offset(): """The error message must point the user at the chunk offset to resume from, otherwise the partial_frame attribute is a footgun — the user has no way to know which chunks still need re-issuing.""" + manifest = ChunkManifest( + plan=(("sites", tuple(("s" + str(i),) for i in range(20))),), + completed=7, + ) e = QuotaExhausted( partial_frame=pd.DataFrame(), partial_response=mock.Mock(), - completed_chunks=7, - total_chunks=20, + manifest=manifest, remaining=12, ) msg = str(e) @@ -768,6 +778,380 @@ def test_chunkable_params_skips_filter_passed_as_list(): assert "filter_lang" not in chunkable +# -- ChunkManifest + resume_from -------------------------------------------- + + +def test_chunk_manifest_basic_properties(): + """``total`` is cartesian-product cardinality; ``is_complete`` and + ``remaining`` are derived from ``completed``.""" + plan = _normalize_plan({"a": [["x"], ["y"]], "b": [["1"], ["2"], ["3"]]}) + m = ChunkManifest(plan=plan, completed=0) + assert m.total == 6 + assert m.is_complete is False + assert m.remaining == 6 + + m = ChunkManifest(plan=plan, completed=4) + assert m.remaining == 2 + assert m.is_complete is False + + m = ChunkManifest(plan=plan, completed=6) + assert m.is_complete is True + assert m.remaining == 0 + + +def test_chunk_manifest_is_hashable_and_immutable(): + """``frozen=True`` lets users stash a manifest in a set or use it as + a dict key when bookkeeping partial-resume state across calls.""" + import dataclasses + + plan = _normalize_plan({"a": [["x"], ["y"]]}) + m1 = ChunkManifest(plan=plan, completed=1) + m2 = ChunkManifest(plan=plan, completed=1) + assert m1 == m2 and hash(m1) == hash(m2) + with pytest.raises(dataclasses.FrozenInstanceError): + m1.completed = 99 # type: ignore[misc] + + +def test_normalize_plan_preserves_insertion_order(): + """Cartesian-product iteration order is dict-insertion order; the + normalized plan must keep it stable for the resume validity check.""" + p1 = _normalize_plan({"a": [["1"]], "b": [["x"]]}) + p2 = _normalize_plan({"b": [["x"]], "a": [["1"]]}) # reversed insertion + assert p1 != p2 # order is significant — same chunks, different iteration + + +def test_multi_value_chunked_attaches_manifest_on_success(): + """A successful chunked call attaches the manifest to the aggregated + response so callers can log ``md.chunk_manifest`` and confirm the + request fanned out, even when nothing went wrong.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + _df, resp = fetch({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + manifest = resp.chunk_manifest + assert isinstance(manifest, ChunkManifest) + assert manifest.is_complete is True + assert manifest.completed == manifest.total > 1 + + +def test_multi_value_chunked_no_manifest_when_pass_through(): + """If the request fits in one round-trip the wrapper short-circuits + to ``fetch_once`` and no manifest is attached — caller-side + ``md.chunk_manifest`` (via ``BaseMetadata.__init__``'s ``getattr`` + default) would be ``None``.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(args): + # Use a real ``requests.Response`` so attribute presence is honest: + # ``mock.Mock`` autocreates attributes on access, which would mask + # a missing ``chunk_manifest``. + import requests as _r # local to avoid module-level test dependency + + resp = _r.Response() + resp.elapsed = datetime.timedelta(seconds=0.1) + return pd.DataFrame(), resp + + _df, resp = fetch({"sites": ["A", "B"]}) + assert not hasattr(resp, "chunk_manifest") + + +def test_basemetadata_exposes_chunk_manifest(): + """BaseMetadata is the user's view onto a response — the manifest + must reach the caller through it without any extra wrapping.""" + from dataretrieval.utils import BaseMetadata + + plan = _normalize_plan({"a": [["x"], ["y"]]}) + manifest = ChunkManifest(plan=plan, completed=1) + resp = mock.Mock( + url="u", + elapsed=datetime.timedelta(seconds=0), + headers={}, + chunk_manifest=manifest, + ) + md = BaseMetadata(resp) + assert md.chunk_manifest is manifest + + # A response with no chunk_manifest attribute → md.chunk_manifest is None. + resp_plain = mock.Mock( + url="u", elapsed=datetime.timedelta(seconds=0), headers={}, spec=[] + ) + md_plain = BaseMetadata(resp_plain) + assert md_plain.chunk_manifest is None + + +def test_multi_value_chunked_wraps_walk_pages_failure_in_partial_result(): + """When a sub-request raises (e.g. PR #279's ``RuntimeError`` for + mid-pagination failure), the chunker re-raises as ``PartialResult`` + with the underlying exception preserved via ``__cause__`` and a + manifest pinned at the failing sub-request's index — so the caller + can resume from that exact offset.""" + call_count = {"n": 0} + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + call_count["n"] += 1 + if call_count["n"] == 3: + raise RuntimeError("simulated mid-pagination 5xx after 2 pages") + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + with pytest.raises(PartialResult) as excinfo: + fetch({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + assert err.manifest.completed == 2 # two succeeded before the 3rd failed + assert err.manifest.total == 4 + assert isinstance(err.__cause__, RuntimeError) + assert "mid-pagination" in str(err.__cause__) + # Partial frame combines the two successful chunks. + assert len(err.partial_frame) == 2 + # The synthesized response carries the manifest for BaseMetadata. + assert err.partial_response.chunk_manifest is err.manifest + + +def test_partial_result_first_chunk_failure_returns_empty_frame(): + """Edge case: the very first sub-request fails. ``partial_frame`` is + empty, ``manifest.completed`` is 0, but the response still has the + canonical URL and manifest so a resume attempt can validate the plan.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + raise RuntimeError("transient transport failure") + + with pytest.raises(PartialResult) as excinfo: + fetch({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + assert err.manifest.completed == 0 + assert err.manifest.total == 4 + assert err.partial_frame.empty + assert err.partial_response.url # canonical URL set even with no responses + + +def test_partial_result_partial_metadata_wraps_response(): + """``PartialResult.partial_metadata`` lazy-wraps the response in + ``BaseMetadata`` so callers can grab the manifest without importing + ``BaseMetadata`` themselves at the catch site.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + raise RuntimeError("boom") + + with pytest.raises(PartialResult) as excinfo: + fetch({"sites": ["S1" * 10, "S2" * 10]}) + + md = excinfo.value.partial_metadata + assert md.chunk_manifest is excinfo.value.manifest + + +def test_resume_from_skips_completed_chunks(): + """``resume_from`` makes the wrapper skip the first ``manifest.completed`` + cartesian-product combinations and fetch only the remainder. The + returned frame contains only the *new* chunks' data — the caller is + responsible for concatenating their saved partial frame.""" + seen: list[tuple[str, ...]] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + seen.append(tuple(args["sites"])) + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + sites = ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10] + # First, run normally to get the plan that would have been used. + plan = _plan_chunks({"sites": sites}, _fake_build, url_limit=240) + plan_norm = _normalize_plan(plan) + + # Pretend a prior call completed 2 of 4 sub-requests. + prior_manifest = ChunkManifest(plan=plan_norm, completed=2) + md = mock.Mock(chunk_manifest=prior_manifest) + + df, resp = fetch({"sites": sites, "resume_from": md}) + # Only the last 2 cartesian-product combos should have run. + assert len(seen) == 2 + assert len(df) == sum(len(s) for s in seen) + # The returned response's manifest shows the full plan completed. + assert resp.chunk_manifest.is_complete + + +def test_resume_from_rejects_mismatched_plan(): + """If the caller changed their kwargs between the original failure + and the resume call, the fresh plan won't match the manifest's plan. + Silently re-fetching with a different plan would interleave data + from two incompatible queries — raise instead.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + pytest.fail("should not reach fetch_once on a mismatched-plan resume") + + plan = _plan_chunks( + {"sites": ["S" * 10 + str(i) for i in range(4)]}, + _fake_build, + url_limit=240, + ) + manifest = ChunkManifest(plan=_normalize_plan(plan), completed=1) + md = mock.Mock(chunk_manifest=manifest) + + with pytest.raises(ValueError, match="manifest does not match"): + fetch( + { + "sites": ["T" * 10 + str(i) for i in range(4)], # different values + "resume_from": md, + } + ) + + +def test_resume_from_rejects_no_manifest(): + """A metadata that wasn't returned by a chunked call has no + ``chunk_manifest``. Treating that as "fresh query" silently would + confuse users who think they're resuming — raise.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + pytest.fail("should not reach fetch_once when resume metadata is invalid") + + md = mock.Mock(chunk_manifest=None) + with pytest.raises(ValueError, match="no chunk_manifest"): + fetch({"sites": ["S" * 10 + str(i) for i in range(4)], "resume_from": md}) + + +def test_resume_from_rejects_complete_manifest(): + """A complete manifest has nothing to resume; the caller probably + has stale state. Raise so they notice rather than re-issuing the + whole plan (which we'd happily skip-to-end).""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + pytest.fail("should not reach fetch_once on an already-complete manifest") + + plan = _plan_chunks( + {"sites": ["S" * 10 + str(i) for i in range(4)]}, + _fake_build, + url_limit=240, + ) + plan_norm = _normalize_plan(plan) + total = math.prod(len(c) for _, c in plan_norm) + md = mock.Mock(chunk_manifest=ChunkManifest(plan=plan_norm, completed=total)) + + with pytest.raises(ValueError, match="already complete"): + fetch({"sites": ["S" * 10 + str(i) for i in range(4)], "resume_from": md}) + + +def test_resume_from_rejects_when_fresh_args_dont_chunk(): + """If the current args fit in one round-trip the wrapper would + short-circuit to ``fetch_once`` — there'd be no plan to skip + against. That state is almost certainly a caller bug (they probably + pruned their site list between calls), so raise.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(_args): + pytest.fail("should not reach fetch_once when resume can't be validated") + + plan = _plan_chunks( + {"sites": ["S" * 10 + str(i) for i in range(4)]}, + _fake_build, + url_limit=240, + ) + manifest = ChunkManifest(plan=_normalize_plan(plan), completed=1) + md = mock.Mock(chunk_manifest=manifest) + + with pytest.raises(ValueError, match="do not produce a chunk plan"): + fetch({"sites": ["A", "B"], "resume_from": md}) + + +def test_resume_from_not_forwarded_to_underlying_fetch(): + """``resume_from`` is a chunker-only control kwarg; it must never + reach ``fetch_once`` (which would forward it to the URL builder as a + bogus query parameter).""" + forwarded: list[dict] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + forwarded.append(args) + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + sites = ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10] + plan = _plan_chunks({"sites": sites}, _fake_build, url_limit=240) + md = mock.Mock( + chunk_manifest=ChunkManifest(plan=_normalize_plan(plan), completed=2) + ) + + fetch({"sites": sites, "resume_from": md}) + assert forwarded, "fetch_once should have been called for the remaining chunks" + for sub_args in forwarded: + assert "resume_from" not in sub_args + + +def test_resume_e2e_quota_exhaust_then_resume_completes_query(): + """End-to-end: a first call exhausts quota partway through; the + caller catches ``QuotaExhausted``, saves ``(partial_df, + partial_metadata)``, then re-runs with ``resume_from=`` to fetch + the rest. Concatenating the two frames reproduces what a single + successful call would have returned.""" + # First call: 4 chunks, quota header decrements 500, 200, 100, then + # 40 (< floor=50). Quota check runs *after* each chunk but only + # gates the *next* one, so 3 chunks land then the 4th is skipped. + remaining_seq_1 = iter([500, 200, 40]) # only need first 3 — abort before 4th + pages_1 = iter(range(3)) + + def fetch_first(args): + return ( + pd.DataFrame( + { + "site": list(args["sites"]), + "page": [next(pages_1)] * len(args["sites"]), + } + ), + _quota_response(next(remaining_seq_1)), + ) + + decorated_first = multi_value_chunked( + build_request=_fake_build, url_limit=240, quota_safety_floor=50 + )(fetch_first) + + sites = ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10] + with pytest.raises(QuotaExhausted) as excinfo: + decorated_first({"sites": sites}) + partial_df = excinfo.value.partial_frame + partial_md = excinfo.value.partial_metadata + assert excinfo.value.manifest.completed == 3 + assert excinfo.value.manifest.total == 4 + + # Second call (after quota window resets): same args + resume_from. + # Only the final cartesian combo should run. + def fetch_second(args): + return ( + pd.DataFrame( + {"site": list(args["sites"]), "page": [99] * len(args["sites"])} + ), + _quota_response(remaining=800), + ) + + decorated_second = multi_value_chunked( + build_request=_fake_build, url_limit=240, quota_safety_floor=50 + )(fetch_second) + + rest_df, rest_md = decorated_second({"sites": sites, "resume_from": partial_md}) + assert rest_md.chunk_manifest.is_complete + assert list(rest_df["page"]) == [99] # exactly one chunk fetched on resume + + final = pd.concat([partial_df, rest_df], ignore_index=True) + assert set(final["page"]) == {0, 1, 2, 99} + assert len(final) == 4 # 4 singleton chunks total + + def test_samples_results(): """Test results call for proper columns""" df, _ = get_samples( From 64d1a6e2e15d7a0ed97f67bc352a95d1c1c5973c Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sun, 17 May 2026 17:16:18 -0500 Subject: [PATCH 3/6] docs(waterdata): Add retry-loop example for resume_from Worked example in get_daily's docstring showing the canonical PartialResult catch / accumulate-partials / sleep-and-retry pattern, capped at a one-hour deadline matched to the API's hourly rate-limit window so structural failures surface rather than spin forever. Module-level multi_value_chunked docstring and the per-getter resume_from parameter doc now point to get_daily for the example. --- dataretrieval/waterdata/api.py | 78 +++++++++++++++++++++++++---- dataretrieval/waterdata/chunking.py | 4 +- 2 files changed, 70 insertions(+), 12 deletions(-) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index bae04ecf..0f5f0a44 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -195,7 +195,8 @@ def get_daily( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -252,6 +253,51 @@ def get_daily( ... parameter_code="00060", ... time="P7D", ... ) + + >>> # Resume loop: a heavy chunked query may exhaust the hourly + >>> # rate-limit budget partway through or hit a transient upstream + >>> # error. Catch ``PartialResult``, accumulate the partial frames, + >>> # and re-call with ``resume_from=`` to fetch only the + >>> # outstanding chunks. The USGS API rate-limit window is one + >>> # hour, so a total retry window of one hour is a sensible + >>> # ceiling — anything longer means the failure is structural, + >>> # not transient, and the loop should surface the error. + >>> import time + >>> import pandas as pd + >>> from dataretrieval import waterdata + >>> from dataretrieval.waterdata.chunking import PartialResult + >>> + >>> sites = sites_df["monitoring_location_id"].tolist() + >>> deadline = time.monotonic() + 3600 # one hour cap + >>> partials = [] + >>> md = None # carries the latest chunk_manifest between attempts + >>> attempt = 0 + >>> while True: + ... try: + ... df, md = waterdata.get_daily( + ... monitoring_location_id=sites, + ... parameter_code="00060", + ... time="P7D", + ... resume_from=md, # ``None`` on the first attempt + ... ) + ... break # full result fetched + ... except PartialResult as exc: + ... partials.append(exc.partial_frame) + ... md = exc.partial_metadata + ... if time.monotonic() >= deadline: + ... raise TimeoutError( + ... f"Could not complete chunked query within one hour " + ... f"({md.chunk_manifest.completed}/" + ... f"{md.chunk_manifest.total} chunks done)." + ... ) from exc + ... attempt += 1 + ... # Exponential backoff, capped at 10 minutes. Quota- + ... # reset failures benefit from a longer wait; transient + ... # transport errors clear quickly. ``min(...)`` ensures + ... # a tight cap; the outer deadline ensures we never wait + ... # past one hour total. + ... time.sleep(min(60 * 2 ** (attempt - 1), 600)) + >>> full = pd.concat([*partials, df], ignore_index=True) """ service = "daily" output_id = "daily_id" @@ -412,7 +458,8 @@ def get_continuous( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -727,7 +774,8 @@ def get_monitoring_locations( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -957,7 +1005,8 @@ def get_time_series_metadata( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -1161,7 +1210,8 @@ def get_combined_metadata( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -1385,7 +1435,8 @@ def get_latest_continuous( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -1589,7 +1640,8 @@ def get_latest_daily( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -1784,7 +1836,8 @@ def get_field_measurements( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -1909,7 +1962,8 @@ def get_field_measurements_metadata( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -2040,7 +2094,8 @@ def get_peaks( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- @@ -2899,7 +2954,8 @@ def get_channel( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. + kwargs as the original call. See ``get_daily`` for a worked + retry-loop example. Returns ------- diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index 142ee0a8..511d7475 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -617,7 +617,9 @@ def multi_value_chunked( the caller's kwargs. The wrapper pops it before planning (so it never reaches the underlying HTTP request), validates the saved plan matches the fresh plan, and skips the already-completed - cartesian-product combinations. + cartesian-product combinations. See ``get_daily``'s docstring for + a worked retry-loop example using a one-hour deadline matched to + the API's rate-limit window. """ def decorator(fetch_once: _FetchOnce) -> _FetchOnce: From 8b14f6c88dd106679c8c7e161e12d49628db9214 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sun, 17 May 2026 17:20:11 -0500 Subject: [PATCH 4/6] docs(waterdata): Move resume retry-loop example to user guide MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The example doesn't belong in get_daily's docstring — it's a topical explanation of an API contract that applies to every chunked getter, not a usage demo of one function. Move it to a dedicated Sphinx user- guide page (waterdata_chunking.rst) covering the chunker's resume contract, the canonical retry-loop pattern with a one-hour deadline, the four resume-validation failure modes, and how to inspect the chunk manifest on successful calls. multi_value_chunked's module docstring and the per-getter resume_from parameter doc now cross-reference the new page. --- dataretrieval/waterdata/api.py | 78 +++-------- dataretrieval/waterdata/chunking.py | 7 +- docs/source/userguide/index.rst | 1 + docs/source/userguide/waterdata_chunking.rst | 135 +++++++++++++++++++ 4 files changed, 162 insertions(+), 59 deletions(-) create mode 100644 docs/source/userguide/waterdata_chunking.rst diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 0f5f0a44..3ef3ddc4 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -195,7 +195,8 @@ def get_daily( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -253,51 +254,6 @@ def get_daily( ... parameter_code="00060", ... time="P7D", ... ) - - >>> # Resume loop: a heavy chunked query may exhaust the hourly - >>> # rate-limit budget partway through or hit a transient upstream - >>> # error. Catch ``PartialResult``, accumulate the partial frames, - >>> # and re-call with ``resume_from=`` to fetch only the - >>> # outstanding chunks. The USGS API rate-limit window is one - >>> # hour, so a total retry window of one hour is a sensible - >>> # ceiling — anything longer means the failure is structural, - >>> # not transient, and the loop should surface the error. - >>> import time - >>> import pandas as pd - >>> from dataretrieval import waterdata - >>> from dataretrieval.waterdata.chunking import PartialResult - >>> - >>> sites = sites_df["monitoring_location_id"].tolist() - >>> deadline = time.monotonic() + 3600 # one hour cap - >>> partials = [] - >>> md = None # carries the latest chunk_manifest between attempts - >>> attempt = 0 - >>> while True: - ... try: - ... df, md = waterdata.get_daily( - ... monitoring_location_id=sites, - ... parameter_code="00060", - ... time="P7D", - ... resume_from=md, # ``None`` on the first attempt - ... ) - ... break # full result fetched - ... except PartialResult as exc: - ... partials.append(exc.partial_frame) - ... md = exc.partial_metadata - ... if time.monotonic() >= deadline: - ... raise TimeoutError( - ... f"Could not complete chunked query within one hour " - ... f"({md.chunk_manifest.completed}/" - ... f"{md.chunk_manifest.total} chunks done)." - ... ) from exc - ... attempt += 1 - ... # Exponential backoff, capped at 10 minutes. Quota- - ... # reset failures benefit from a longer wait; transient - ... # transport errors clear quickly. ``min(...)`` ensures - ... # a tight cap; the outer deadline ensures we never wait - ... # past one hour total. - ... time.sleep(min(60 * 2 ** (attempt - 1), 600)) - >>> full = pd.concat([*partials, df], ignore_index=True) """ service = "daily" output_id = "daily_id" @@ -458,7 +414,8 @@ def get_continuous( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -774,7 +731,8 @@ def get_monitoring_locations( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -1005,7 +963,8 @@ def get_time_series_metadata( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -1210,7 +1169,8 @@ def get_combined_metadata( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -1435,7 +1395,8 @@ def get_latest_continuous( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -1640,7 +1601,8 @@ def get_latest_daily( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -1836,7 +1798,8 @@ def get_field_measurements( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -1962,7 +1925,8 @@ def get_field_measurements_metadata( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -2094,7 +2058,8 @@ def get_peaks( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns @@ -2954,7 +2919,8 @@ def get_channel( ``QuotaExhausted``) exception from a previous call. The chunker consults its ``chunk_manifest`` to skip already-completed sub-requests and fetch only the remainder. Pass the same other - kwargs as the original call. See ``get_daily`` for a worked + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked retry-loop example. Returns diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index 511d7475..5cbd26fc 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -617,9 +617,10 @@ def multi_value_chunked( the caller's kwargs. The wrapper pops it before planning (so it never reaches the underlying HTTP request), validates the saved plan matches the fresh plan, and skips the already-completed - cartesian-product combinations. See ``get_daily``'s docstring for - a worked retry-loop example using a one-hour deadline matched to - the API's rate-limit window. + cartesian-product combinations. See the + ``waterdata-chunking-resume`` user-guide page for a worked + retry-loop example using a one-hour deadline matched to the + API's rate-limit window. """ def decorator(fetch_once: _FetchOnce) -> _FetchOnce: diff --git a/docs/source/userguide/index.rst b/docs/source/userguide/index.rst index 2ba5a93a..a9887f5d 100644 --- a/docs/source/userguide/index.rst +++ b/docs/source/userguide/index.rst @@ -15,3 +15,4 @@ Contents timeconventions dataportals + waterdata_chunking diff --git a/docs/source/userguide/waterdata_chunking.rst b/docs/source/userguide/waterdata_chunking.rst new file mode 100644 index 00000000..c0643839 --- /dev/null +++ b/docs/source/userguide/waterdata_chunking.rst @@ -0,0 +1,135 @@ +.. _waterdata-chunking-resume: + +Chunked Queries and Resuming After Failure +------------------------------------------ + +The OGC ``waterdata`` getters (``get_daily``, ``get_continuous``, +``get_field_measurements``, and the other multi-value-capable +functions) transparently split requests whose URLs would otherwise +exceed the USGS Water Data API's ~8 KB byte limit. A heavy chained +query — e.g. *"pull every stream site in Ohio, then their daily +discharge for the last week"* — fans out into many sub-requests under +the hood and returns one combined DataFrame. + +Long-running chunked calls can fail partway through. Two common +causes: + +- **Quota exhaustion.** The API rate-limits each HTTP request + (including pagination). The chunker monitors the + ``x-ratelimit-remaining`` header between sub-requests and aborts + before issuing the next one if the budget drops below the safety + floor. +- **Transient upstream errors.** A single sub-request can hit a 5xx, + a network blip, or a mid-pagination 429. + +Both cases raise :class:`~dataretrieval.waterdata.chunking.PartialResult` +(the quota case raises its subclass +:class:`~dataretrieval.waterdata.chunking.QuotaExhausted`). The +exception carries the combined partial DataFrame and a +:class:`~dataretrieval.waterdata.chunking.ChunkManifest` that records +how many sub-requests of the cartesian-product plan completed. + +The same getter accepts the partial metadata back via a +``resume_from=`` kwarg. The chunker validates that the freshly-planned +chunk layout matches the saved manifest, then issues only the +outstanding sub-requests. + +The Resume Pattern +****************** + +The canonical idiom: a loop that retries on ``PartialResult``, +accumulates each call's partial DataFrame, and threads the latest +metadata back into the next attempt as ``resume_from=``. The USGS API +rate-limit window is one hour, so a total retry deadline of one hour +is a sensible ceiling — anything longer means the failure is +structural, not transient, and the loop should surface the error +rather than spin forever. + +.. code:: python + + import time + import pandas as pd + from dataretrieval import waterdata + from dataretrieval.waterdata.chunking import PartialResult + + sites_df, _ = waterdata.get_monitoring_locations( + state_name="Ohio", + site_type="Stream", + ) + sites = sites_df["monitoring_location_id"].tolist() + + deadline = time.monotonic() + 3600 # one-hour cap + partials = [] + md = None # carries the latest chunk_manifest between attempts + attempt = 0 + + while True: + try: + df, md = waterdata.get_daily( + monitoring_location_id=sites, + parameter_code="00060", + time="P7D", + resume_from=md, # None on the first attempt + ) + break # full result fetched + except PartialResult as exc: + partials.append(exc.partial_frame) + md = exc.partial_metadata + if time.monotonic() >= deadline: + raise TimeoutError( + f"Could not complete chunked query within one hour " + f"({md.chunk_manifest.completed}/" + f"{md.chunk_manifest.total} chunks done)." + ) from exc + attempt += 1 + # Exponential backoff capped at 10 minutes. Quota-reset + # failures benefit from a longer wait; transient transport + # errors clear quickly. The outer deadline still bounds total + # wait time at one hour. + time.sleep(min(60 * 2 ** (attempt - 1), 600)) + + # Each partial frame plus the final ``df`` is disjoint, so a single + # ``concat`` reconstructs what a successful one-shot call would have + # returned. + full = pd.concat([*partials, df], ignore_index=True) + +How Resume Validates the Plan +***************************** + +``ChunkManifest`` pins the *normalized cartesian-product plan*, not +just the input kwargs. If a caller changes their inputs between the +original failure and the retry — even in ways that look equivalent — +the freshly-computed plan would differ from the saved one, and +silently re-fetching would interleave data from two incompatible +queries. The chunker raises ``ValueError`` instead, with one of four +explicit messages: + +- ``"resume_from has no chunk_manifest"`` — the metadata is from a + call that wasn't chunked (or from a different source entirely). +- ``"do not produce a chunk plan"`` — the current args fit in one + round-trip, so there is no plan to skip against. +- ``"manifest does not match the current chunk plan"`` — the input + list changed between calls. +- ``"already complete"`` — the saved manifest is fully consumed; + drop ``resume_from``. + +Inspecting the Manifest on Success +********************************** + +The manifest is also attached to ``BaseMetadata.chunk_manifest`` on +successful chunked calls, so callers can log fan-out information +without catching anything: + +.. code:: python + + df, md = waterdata.get_daily( + monitoring_location_id=sites, + parameter_code="00060", + time="P7D", + ) + if md.chunk_manifest is not None: + m = md.chunk_manifest + logger.info("query fanned out across %d sub-requests", m.total) + +For calls that did not need chunking, ``md.chunk_manifest`` is +``None``. From 4b441c20e652c8b42fd100ff984bd3ca7150511c Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sun, 17 May 2026 17:31:16 -0500 Subject: [PATCH 5/6] refactor(waterdata): Dedupe chunker abort paths through _assemble_partial MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both abort sites (quota-exhausted bail and sub-request exception) and the success path now share one helper that combines responses, restores the canonical URL, builds the manifest, and attaches it to the response — eliminating the three near-identical inline blocks. Message formatting moves to a single _partial_result_message() so the three "Catch ... to access .partial_frame and resume" strings collapse to one. The "resume_from" kwarg literal becomes _RESUME_FROM_KEY for consistency with _QUOTA_HEADER, and the args-strip uses the standard dict()+pop() idiom. PR-number references that would rot in public docstrings dropped. --- dataretrieval/waterdata/chunking.py | 151 ++++++++++++++-------------- 1 file changed, 78 insertions(+), 73 deletions(-) diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index 5cbd26fc..b0d85152 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -99,6 +99,11 @@ # Response header USGS uses to advertise remaining hourly quota. _QUOTA_HEADER = "x-ratelimit-remaining" +# Reserved kwarg name the wrapper strips from ``args`` before any planner +# or URL probe runs — it's a chunker-only control parameter and must not +# reach the underlying API as a bogus query parameter. +_RESUME_FROM_KEY = "resume_from" + # Sentinel returned by ``_read_remaining`` when the response has no # parseable ``x-ratelimit-remaining`` header. Large enough to beat any # plausible safety floor so a missing/malformed header doesn't trigger @@ -215,31 +220,45 @@ def __init__( partial_frame: pd.DataFrame, partial_response: requests.Response, manifest: ChunkManifest, + cause: BaseException | None = None, message: str | None = None, ) -> None: - super().__init__( - message - or ( - f"Chunked request failed after " - f"{manifest.completed}/{manifest.total} sub-requests. " - f"Catch PartialResult to access .partial_frame and " - f"resume_from=partial_metadata on a retry." - ) - ) + super().__init__(message or _partial_result_message(manifest, cause)) self.partial_frame = partial_frame self.partial_response = partial_response self.manifest = manifest @property def partial_metadata(self): - """``BaseMetadata`` wrapping ``partial_response``. Lazy so the - chunker module stays decoupled from ``dataretrieval.utils`` at - import time (avoids a circular-import-shaped surface).""" + """``BaseMetadata`` wrapping ``partial_response``. Lazy to avoid + a circular import with ``dataretrieval.utils`` at module load.""" from dataretrieval.utils import BaseMetadata return BaseMetadata(self.partial_response) +def _partial_result_message( + manifest: ChunkManifest, cause: BaseException | None +) -> str: + """Single source of truth for the user-facing failure message. The + ``cause`` branch names the failing sub-request and underlying + exception (most common path); the no-cause branch is the generic + aborted-after-N form used by ``QuotaExhausted``'s pre-emptive bail.""" + tail = ( + " Catch PartialResult to access .partial_frame and resume with " + "resume_from=partial_metadata." + ) + if cause is None: + return ( + f"Chunked request failed after {manifest.completed}/" + f"{manifest.total} sub-requests.{tail}" + ) + return ( + f"Chunked request failed at sub-request {manifest.completed + 1}/" + f"{manifest.total} ({type(cause).__name__}: {cause}).{tail}" + ) + + class QuotaExhausted(PartialResult): """Raised mid-chunked-call when the API's reported remaining quota (``x-ratelimit-remaining`` header) drops below the configured @@ -268,8 +287,8 @@ def __init__( f"x-ratelimit-remaining dropped to {remaining} after " f"{manifest.completed}/{manifest.total} chunks; aborting to " f"avoid mid-call HTTP 429. Catch QuotaExhausted to access " - f".partial_frame and resume_from=partial_metadata after the " - f"hourly window resets." + f".partial_frame and resume with resume_from=partial_metadata " + f"after the hourly window resets." ), ) self.remaining = remaining @@ -357,9 +376,7 @@ def _request_bytes(req: requests.PreparedRequest) -> int: def _plan_total(plan: dict[str, list[list[Any]]]) -> int: """Sub-request count a plan will issue: the cartesian product of - per-dim chunk counts. Computed in two places (planner's max_chunks - early-bail and wrapper's QuotaExhausted payload) — centralized - here so the two can't drift.""" + per-dim chunk counts.""" return math.prod(len(chunks) for chunks in plan.values()) @@ -472,26 +489,19 @@ def _read_remaining(response: requests.Response) -> int: return _QUOTA_UNKNOWN -def _build_partial_failure( +def _assemble_partial( *, frames: list[pd.DataFrame], responses: list[requests.Response], canonical_url: str, plan_norm: _NormalizedPlan, completed: int, - sub_index: int, - total: int, - cause: BaseException, -) -> PartialResult: - """Assemble a ``PartialResult`` for a sub-request that errored. - - ``completed`` is the count of sub-requests that finished before the - failure (so a fresh ``ChunkManifest(completed=completed)`` resumes at - the failed one). ``responses`` may be empty (failure on the very - first attempted sub-request, including the first chunk of a resume - call); in that case the synthesized response carries only the - canonical URL and an empty header set, with the manifest still - attached so caller-side ``BaseMetadata.chunk_manifest`` works. +) -> tuple[pd.DataFrame, requests.Response, ChunkManifest]: + """Build the partial-state triple shared by every chunker abort path + (quota-exhausted, sub-request error, and the success path's final + response). ``responses`` may be empty (first attempted sub-request + failed); the synthesized response then carries only the canonical + URL so caller-side ``BaseMetadata.chunk_manifest`` still works. """ if responses: partial = _combine_chunk_responses(responses) @@ -502,17 +512,7 @@ def _build_partial_failure( manifest = ChunkManifest(plan=plan_norm, completed=completed) partial.chunk_manifest = manifest partial_frame = _combine_chunk_frames(frames) if frames else pd.DataFrame() - return PartialResult( - partial_frame=partial_frame, - partial_response=partial, - manifest=manifest, - message=( - f"Chunked request failed at sub-request " - f"{sub_index + 1}/{total} ({type(cause).__name__}: {cause}). " - f"Catch PartialResult to access .partial_frame and resume " - f"with resume_from=partial_metadata." - ), - ) + return partial_frame, partial, manifest def _resolve_resume( @@ -584,14 +584,12 @@ def multi_value_chunked( carrying the combined partial result and a ``ChunkManifest`` so callers can resume after the hourly window resets via ``resume_from=partial_metadata``, instead of crashing into a - mid-pagination HTTP 429 (which the upstream pagination loop in - ``_walk_pages`` historically truncated silently — see PR #273). + mid-pagination HTTP 429. - Any other failure inside a sub-request (transport errors, mid- - pagination ``RuntimeError`` from PR #279, inner-filter - ``RequestTooLarge``) is re-raised as ``PartialResult`` with the - same partial-state payload, with the underlying exception preserved - via ``__cause__``. + Any other failure inside a sub-request (transport errors, + mid-pagination ``RuntimeError``, inner-filter ``RequestTooLarge``) + is re-raised as ``PartialResult`` with the same partial-state + payload, with the underlying exception preserved via ``__cause__``. Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop, filter-chunking is the inner loop. The wrapped function has the same @@ -638,11 +636,11 @@ def wrapper( if quota_safety_floor is not None else _DEFAULT_QUOTA_SAFETY_FLOOR ) - # ``resume_from`` is a chunker-only control kwarg; pull it - # out before any planner/URL probe runs so it can't reach - # the underlying API as a bogus query parameter. - resume_from = args.get("resume_from") - args = {k: v for k, v in args.items() if k != "resume_from"} + # ``resume_from`` is a chunker-only control kwarg; strip it + # before the planner runs so it can't reach the underlying + # API as a bogus query parameter. + args = dict(args) + resume_from = args.pop(_RESUME_FROM_KEY, None) plan = _plan_chunks(args, build_request, limit, max_chunks) @@ -655,12 +653,9 @@ def wrapper( if plan is None: return fetch_once(args) - # Pre-build the canonical URL representing the user's full - # original query. The chunker sends sub-requests with sliced - # multi-value lists; without this restore, the aggregated - # response's ``.url`` would only show the first chunk and - # callers logging ``md.url`` for reproducibility would see a - # truncated view of their own query. + # Rebuild the canonical original-query URL so the aggregated + # response's ``.url`` reflects the user's request, not the + # first sub-request's sliced view. canonical_url = build_request(**args).url keys = list(plan) @@ -675,14 +670,17 @@ def wrapper( try: frame, response = fetch_once(sub_args) except Exception as exc: - raise _build_partial_failure( + partial_frame, partial_response, manifest = _assemble_partial( frames=frames, responses=responses, canonical_url=canonical_url, plan_norm=plan_norm, completed=i, - sub_index=i, - total=total, + ) + raise PartialResult( + partial_frame=partial_frame, + partial_response=partial_response, + manifest=manifest, cause=exc, ) from exc frames.append(frame) @@ -692,21 +690,28 @@ def wrapper( if i < total - 1: remaining = _read_remaining(response) if remaining < floor: - partial = _combine_chunk_responses(responses) - partial.url = canonical_url - manifest = ChunkManifest(plan=plan_norm, completed=i + 1) - partial.chunk_manifest = manifest + partial_frame, partial_response, manifest = _assemble_partial( + frames=frames, + responses=responses, + canonical_url=canonical_url, + plan_norm=plan_norm, + completed=i + 1, + ) raise QuotaExhausted( - partial_frame=_combine_chunk_frames(frames), - partial_response=partial, + partial_frame=partial_frame, + partial_response=partial_response, manifest=manifest, remaining=remaining, ) - combined = _combine_chunk_responses(responses) - combined.url = canonical_url - combined.chunk_manifest = ChunkManifest(plan=plan_norm, completed=total) - return _combine_chunk_frames(frames), combined + combined_frame, combined, _ = _assemble_partial( + frames=frames, + responses=responses, + canonical_url=canonical_url, + plan_norm=plan_norm, + completed=total, + ) + return combined_frame, combined return wrapper # type: ignore[return-value] From 428afd52dad96e341416dc4e0911778f68ac5880 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sun, 17 May 2026 18:33:54 -0500 Subject: [PATCH 6/6] feat(waterdata): Re-export chunker exceptions at package level MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ChunkManifest, PartialResult, QuotaExhausted, and RequestTooLarge are now available from ``dataretrieval.waterdata`` directly. Callers following the resume retry-loop pattern no longer need to reach into the ``dataretrieval.waterdata.chunking`` submodule to catch PartialResult — the public import matches where the getters live. multi_value_chunked's docstring now states explicitly that the wrapper catches every ``Exception`` (not just the three named example types) and that ``BaseException`` subclasses propagate unchanged, so callers know KeyboardInterrupt aborts a chunked call cleanly while a programmer-error TypeError still gets wrapped with its partial state. The userguide example uses the new top-level import. --- dataretrieval/waterdata/__init__.py | 10 ++++++++++ dataretrieval/waterdata/chunking.py | 11 +++++++---- docs/source/userguide/waterdata_chunking.rst | 2 +- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index f81966c4..c1a943c7 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -29,6 +29,12 @@ get_stats_por, get_time_series_metadata, ) +from .chunking import ( + ChunkManifest, + PartialResult, + QuotaExhausted, + RequestTooLarge, +) from .filters import FILTER_LANG from .nearest import get_nearest_continuous from .ratings import get_ratings @@ -45,6 +51,10 @@ "PROFILES", "PROFILE_LOOKUP", "SERVICES", + "ChunkManifest", + "PartialResult", + "QuotaExhausted", + "RequestTooLarge", "get_channel", "get_codes", "get_combined_metadata", diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py index b0d85152..d56bc730 100644 --- a/dataretrieval/waterdata/chunking.py +++ b/dataretrieval/waterdata/chunking.py @@ -586,10 +586,13 @@ def multi_value_chunked( ``resume_from=partial_metadata``, instead of crashing into a mid-pagination HTTP 429. - Any other failure inside a sub-request (transport errors, - mid-pagination ``RuntimeError``, inner-filter ``RequestTooLarge``) - is re-raised as ``PartialResult`` with the same partial-state - payload, with the underlying exception preserved via ``__cause__``. + Any other ``Exception`` raised inside a sub-request — transport + errors, mid-pagination ``RuntimeError``, inner-filter + ``RequestTooLarge``, or unexpected exceptions from the wrapped + function — is caught and re-raised as ``PartialResult`` with the + same partial-state payload, with the underlying exception preserved + via ``__cause__``. ``BaseException`` subclasses (``KeyboardInterrupt``, + ``SystemExit``) propagate unchanged. Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop, filter-chunking is the inner loop. The wrapped function has the same diff --git a/docs/source/userguide/waterdata_chunking.rst b/docs/source/userguide/waterdata_chunking.rst index c0643839..ea326982 100644 --- a/docs/source/userguide/waterdata_chunking.rst +++ b/docs/source/userguide/waterdata_chunking.rst @@ -50,7 +50,7 @@ rather than spin forever. import time import pandas as pd from dataretrieval import waterdata - from dataretrieval.waterdata.chunking import PartialResult + from dataretrieval.waterdata import PartialResult sites_df, _ = waterdata.get_monitoring_locations( state_name="Ohio",