diff --git a/NEWS.md b/NEWS.md index 7761e29b..ce591cd6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,7 @@ +**05/17/2026:** Chunked `waterdata` calls that fail partway through are now resumable. Any sub-request failure (quota exhaustion, mid-pagination 5xx/429, transport error) raises `PartialResult` (or its `QuotaExhausted` subclass) carrying the combined partial DataFrame, a `BaseMetadata.partial_metadata` accessor, and a `ChunkManifest` recording how many sub-requests of the cartesian-product plan completed. The same getter accepts the partial metadata via a new `resume_from=` kwarg; the chunker validates the saved plan matches the fresh args and fetches only the remaining cartesian-product combinations. Callers concatenate their saved partial DataFrame with the resume call's return value to reconstruct the full result. The manifest is also attached to `BaseMetadata.chunk_manifest` on successful chunked calls for observability. + +**05/17/2026:** The OGC `waterdata` getters (`get_daily`, `get_continuous`, `get_field_measurements`, and the rest of the multi-value-capable functions) now transparently chunk requests whose URLs would otherwise exceed the server's ~8 KB byte limit. A common chained-query pattern — pull a long site list from `get_monitoring_locations`, then feed it into `get_daily` — previously failed with HTTP 414 once the resulting URL grew past the limit; it now fans out across multiple sub-requests under the hood and returns one combined DataFrame. The chunker coordinates with the existing CQL `filter` chunker (long top-level-`OR` filters still split correctly when used alongside long multi-value lists), caps cartesian-product plans at 1000 sub-requests (the default USGS hourly quota), and aborts mid-call with a structured `QuotaExhausted` exception — carrying the partial result and a resume offset — if `x-ratelimit-remaining` drops below a safety floor. Mirrors R `dataRetrieval`'s [#870](https://github.com/DOI-USGS/dataRetrieval/pull/870), generalized to N dimensions. Note one metadata-behavior change for paginated/chunked calls: `BaseMetadata.url` still reflects the user's original query (unchanged), but `BaseMetadata.header` now carries the *last* page's / sub-request's headers (so `x-ratelimit-remaining` is current) rather than the first, and `BaseMetadata.query_time` is now the cumulative wall-clock across pages instead of the first page's elapsed. + **05/16/2026:** Fixed silent truncation in the paginated `waterdata` request loops (`_walk_pages` and `get_stats_data`). Mid-pagination failures (HTTP 429, 5xx, network error) were previously swallowed — pagination would quietly stop and the function would return whatever rows it had collected, leaving callers with truncated DataFrames they had no way to detect. The loops now status-check every page like the initial request and raise `RuntimeError` on any failure, with the upstream exception chained as `__cause__` and a short menu of recovery actions (wait and retry, reduce the request, or obtain an API token) in the message. **Behavior change**: callers that previously consumed partial DataFrames on transient upstream blips will now see an exception; retry the call (possibly with a smaller `limit` or narrower query). **05/07/2026:** Bumped the declared minimum Python version from **3.8** to **3.9** (`pyproject.toml`'s `requires-python` and the ruff target). This brings the manifest in line with what was already being tested — CI's matrix has long covered only 3.9, 3.13, and 3.14, the `waterdata` test module already skipped itself on Python < 3.10, and several modules already use 3.9-only stdlib (e.g. `zoneinfo`). Users on 3.8 will no longer be able to install the package; please upgrade. diff --git a/dataretrieval/utils.py b/dataretrieval/utils.py index 76bbb6ad..6ec24935 100644 --- a/dataretrieval/utils.py +++ b/dataretrieval/utils.py @@ -230,6 +230,11 @@ def __init__(self, response) -> None: self.query_time = response.elapsed self.header = response.headers self.comment = None + # Set by ``waterdata.chunking.multi_value_chunked`` when a request + # was split into sub-requests. ``None`` for non-chunked calls. See + # ``ChunkManifest`` for how callers use this to resume a partial + # query. + self.chunk_manifest = getattr(response, "chunk_manifest", None) # # not sure what statistic_info is # self.statistic_info = None diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index f81966c4..c1a943c7 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -29,6 +29,12 @@ get_stats_por, get_time_series_metadata, ) +from .chunking import ( + ChunkManifest, + PartialResult, + QuotaExhausted, + RequestTooLarge, +) from .filters import FILTER_LANG from .nearest import get_nearest_continuous from .ratings import get_ratings @@ -45,6 +51,10 @@ "PROFILES", "PROFILE_LOOKUP", "SERVICES", + "ChunkManifest", + "PartialResult", + "QuotaExhausted", + "RequestTooLarge", "get_channel", "get_codes", "get_combined_metadata", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ad268194..3ef3ddc4 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -57,6 +57,7 @@ def get_daily( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the day. @@ -189,6 +190,14 @@ def get_daily( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -230,6 +239,21 @@ def get_daily( ... parameter_code="00060", ... last_modified="P7D", ... ) + + >>> # Chain queries: pull all stream sites in a state, then their + >>> # daily discharge for the last week. The site list can be hundreds + >>> # of values long — the request is transparently chunked across + >>> # multiple sub-requests so the URL stays under the server's byte + >>> # limit. Combined output looks like a single query. + >>> sites_df, _ = dataretrieval.waterdata.get_monitoring_locations( + ... state_name="Ohio", + ... site_type="Stream", + ... ) + >>> df, md = dataretrieval.waterdata.get_daily( + ... monitoring_location_id=sites_df["monitoring_location_id"].tolist(), + ... parameter_code="00060", + ... time="P7D", + ... ) """ service = "daily" output_id = "daily_id" @@ -257,6 +281,7 @@ def get_continuous( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """ Continuous data provide instantanous water conditions. @@ -384,6 +409,14 @@ def get_continuous( convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -477,6 +510,7 @@ def get_monitoring_locations( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Location information is basic information about the monitoring location including the name, identifier, agency responsible for data collection, and @@ -692,6 +726,14 @@ def get_monitoring_locations( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -755,6 +797,7 @@ def get_time_series_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data and continuous measurements are grouped into time series, which represent a collection of observations of a single parameter, @@ -915,6 +958,14 @@ def get_time_series_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -1012,6 +1063,7 @@ def get_combined_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get combined monitoring-location and time-series metadata. @@ -1112,6 +1164,14 @@ def get_combined_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -1200,6 +1260,7 @@ def get_latest_continuous( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """This endpoint provides the most recent observation for each time series of continuous data. Continuous data are collected via automated sensors @@ -1329,6 +1390,14 @@ def get_latest_continuous( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -1395,6 +1464,7 @@ def get_latest_daily( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the day. @@ -1526,6 +1596,14 @@ def get_latest_daily( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -1593,6 +1671,7 @@ def get_field_measurements( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Field measurements are physically measured values collected during a visit to the monitoring location. Field measurements consist of measurements @@ -1714,6 +1793,14 @@ def get_field_measurements( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -1777,6 +1864,7 @@ def get_field_measurements_metadata( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get field-measurement metadata: one row per (location, parameter) series. @@ -1832,6 +1920,14 @@ def get_field_measurements_metadata( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -1898,6 +1994,7 @@ def get_peaks( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get the annual peak streamflow / stage record for a monitoring location. @@ -1956,6 +2053,14 @@ def get_peaks( and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- @@ -2695,6 +2800,7 @@ def get_channel( filter: str | None = None, filter_lang: FILTER_LANG | None = None, convert_type: bool = True, + resume_from: BaseMetadata | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """ Channel measurements taken as part of streamflow field measurements. @@ -2808,6 +2914,14 @@ def get_channel( convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector + resume_from : BaseMetadata, optional + Metadata returned alongside a ``PartialResult`` (or + ``QuotaExhausted``) exception from a previous call. The chunker + consults its ``chunk_manifest`` to skip already-completed + sub-requests and fetch only the remainder. Pass the same other + kwargs as the original call. See the + :ref:`waterdata-chunking-resume` user guide for a worked + retry-loop example. Returns ------- diff --git a/dataretrieval/waterdata/chunking.py b/dataretrieval/waterdata/chunking.py new file mode 100644 index 00000000..d56bc730 --- /dev/null +++ b/dataretrieval/waterdata/chunking.py @@ -0,0 +1,721 @@ +"""Multi-value GET-parameter chunking for the Water Data OGC getters. + +PR 233 routes most services through GET with comma-separated values +(e.g. ``monitoring_location_id=USGS-A,USGS-B,...``). Long lists can blow +the server's ~8 KB URL byte limit. This module adds a decorator that +sits OUTSIDE ``filters.chunked`` and splits multi-value list params +across multiple sub-requests so each URL fits. See ``get_daily``'s +docstring for an end-to-end chained-query example. + +Design (orthogonal to filter chunking): + +- N-dimensional cartesian product: for each chunkable list param, the + values are partitioned into sub-lists; the planner emits the cartesian + product of those partitions. Sub-chunks of the same dim never overlap, + so frame concat needs no dedup across multi-value chunks. +- Greedy halving of the largest chunk in any dim until the worst-case + sub-request URL fits the limit. Minimises total request count. +- Date params, ``bbox``, and ``properties`` are not chunked: dates are + intervals not enumerable sets; bbox is a coord array; ``properties`` + determines output schema and chunking it would shard columns. + +Coordination with ``filters.chunked``: +The planner probes the URL with a synthetic clause sized to the inner +chunker's bail floor — ``len(longest_clause) * max(per-clause encoding +ratio)`` — when a chunkable filter is present. The inner chunker bails +(emits the full filter) when any single clause's URL-encoded length +exceeds its per-sub-request budget; mirroring +``filters._effective_filter_budget``, that floor already accounts for +the worst per-call encoding ratio, so a long alphanumeric clause +coexisting with a shorter heavily-encoded clause is sized correctly. +Without this coordination, a long OR-filter plus multi-value lists +would trigger a premature ``RequestTooLarge`` even though the combined +chunkers would have made things fit. +""" + +from __future__ import annotations + +import datetime +import functools +import itertools +import math +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any +from urllib.parse import quote_plus + +import pandas as pd +import requests + +from . import filters +from .filters import ( + _combine_chunk_frames, + _combine_chunk_responses, + _FetchOnce, + _is_chunkable, + _max_per_clause_encoding_ratio, + _split_top_level_or, +) + +# Params that look like lists but must NOT be chunked. ``properties`` is +# excluded because it defines the response schema; chunking it would +# return frames with different columns per sub-request. ``bbox`` is a +# fixed 4-element coord tuple. Date params are intervals not sets. The +# CQL ``filter`` (and its ``filter_lang``) is a string that has its own +# inner chunker (``filters.chunked``); if a caller passes ``filter`` as +# a list, treating it as a multi-value param would emit malformed CQL. +_NEVER_CHUNK = frozenset( + { + "properties", + "bbox", + "datetime", + "last_modified", + "begin", + "begin_utc", + "end", + "end_utc", + "time", + "filter", + "filter_lang", + } +) + +# Default cap on the number of sub-requests a single chunked call may +# emit. The USGS Water Data API rate-limits each HTTP request (including +# pagination), so the true budget is ``hourly_quota / avg_pages_per_chunk``. +# 1000 matches the default hourly quota and is a reasonable upper bound +# for single-page sub-requests; tune lower if your queries paginate. +# Override per-decorator via ``max_chunks=`` or by monkeypatching this +# module attribute — both the decorator wrapper and ``_plan_chunks`` +# read it lazily. +_DEFAULT_MAX_CHUNKS = 1000 + +# When ``x-ratelimit-remaining`` drops below this between sub-requests, +# the chunker bails with ``QuotaExhausted`` rather than risk a mid-call +# HTTP 429. Carries the partial result so callers can resume from a +# known offset instead of retrying the whole chunked call from scratch. +_DEFAULT_QUOTA_SAFETY_FLOOR = 50 + +# Response header USGS uses to advertise remaining hourly quota. +_QUOTA_HEADER = "x-ratelimit-remaining" + +# Reserved kwarg name the wrapper strips from ``args`` before any planner +# or URL probe runs — it's a chunker-only control parameter and must not +# reach the underlying API as a bogus query parameter. +_RESUME_FROM_KEY = "resume_from" + +# Sentinel returned by ``_read_remaining`` when the response has no +# parseable ``x-ratelimit-remaining`` header. Large enough to beat any +# plausible safety floor so a missing/malformed header doesn't trigger +# spurious ``QuotaExhausted`` aborts. +_QUOTA_UNKNOWN = 10**9 + + +class RequestTooLarge(ValueError): + """Raised when a chunked request cannot be issued. Two cases: + (1) URL exceeds the byte limit even with every multi-value param at + a singleton chunk and any chunkable filter at the inner chunker's + bail-floor size (the URL contribution of its longest single + OR-clause, after URL-encoding); (2) the cartesian-product plan + would issue more than ``max_chunks`` sub-requests.""" + + +# Normalized plan shape: a tuple of ``(dim_name, tuple-of-chunk-tuples)`` +# pairs. Used for hashing/equality on ``ChunkManifest`` and to compare a +# resumed call's fresh plan against the saved manifest's plan. +_NormalizedPlan = tuple[tuple[str, tuple[tuple[Any, ...], ...]], ...] + + +def _normalize_plan(plan: dict[str, list[list[Any]]]) -> _NormalizedPlan: + """Convert the planner's mutable nested-list plan to an immutable, + comparable nested-tuple form. Preserves insertion order, which is + the cartesian-product iteration order (Python 3.7+ dict guarantee).""" + return tuple( + (key, tuple(tuple(chunk) for chunk in chunks)) for key, chunks in plan.items() + ) + + +@dataclass(frozen=True) +class ChunkManifest: + """Snapshot of a chunked call's progress, sufficient to resume. + + Attached to ``BaseMetadata.chunk_manifest`` on every chunked call + (successful or failed). On a failed call the manifest records how + many sub-requests completed; passing the partial metadata back via + ``resume_from=`` re-runs only the remaining cartesian-product + combinations. Pinning the normalized plan (not just the input + args) lets resume detect when a caller has changed their inputs + between the original call and the retry — same-looking args that + chunk differently would silently re-fetch wrong sub-ranges. + + Attributes + ---------- + plan : tuple + Normalized form of the chunker's cartesian-product plan: a + tuple of ``(dim_name, tuple-of-chunk-tuples)`` pairs, in the + order the planner iterates them. + completed : int + Number of sub-requests that completed before the call + terminated. Equal to ``total`` on a fully successful call. + """ + + plan: _NormalizedPlan + completed: int + + @property + def total(self) -> int: + """Total sub-requests in the cartesian-product plan.""" + return math.prod(len(chunks) for _, chunks in self.plan) + + @property + def is_complete(self) -> bool: + """``True`` when every sub-request in the plan completed.""" + return self.completed >= self.total + + @property + def remaining(self) -> int: + """Number of sub-requests still to fetch on resume.""" + return max(self.total - self.completed, 0) + + def __repr__(self) -> str: + return ( + f"ChunkManifest(completed={self.completed}/{self.total}, " + f"dims={len(self.plan)})" + ) + + +class PartialResult(RuntimeError): + """Raised mid-chunked-call when any sub-request fails. Carries the + combined partial frame and a ``ChunkManifest`` recording how many + sub-requests completed. Catch this exception to access partial + data, then re-call the original getter with the partial metadata + via ``resume_from=`` to fetch the remaining chunks. + + ``__cause__`` (set via ``raise ... from exc``) holds the underlying + exception — typically a ``RuntimeError`` from ``_walk_pages``'s + mid-pagination failure handler (see PR #279) or a transport-level + ``requests`` exception. + + Attributes + ---------- + partial_frame : pd.DataFrame + Concatenated, deduplicated result of every sub-request that + completed in this call. Empty if the first attempted + sub-request failed. + partial_response : requests.Response + Aggregated response carrying the canonical URL (the user's + full original query), last successful sub-request's headers, + summed ``elapsed``, and ``chunk_manifest`` attribute set so + ``BaseMetadata(partial_response).chunk_manifest`` exposes the + manifest. + manifest : ChunkManifest + Records the chunk plan and the number of completed + sub-requests. Pass the wrapping metadata back via + ``resume_from=`` to resume. + """ + + def __init__( + self, + *, + partial_frame: pd.DataFrame, + partial_response: requests.Response, + manifest: ChunkManifest, + cause: BaseException | None = None, + message: str | None = None, + ) -> None: + super().__init__(message or _partial_result_message(manifest, cause)) + self.partial_frame = partial_frame + self.partial_response = partial_response + self.manifest = manifest + + @property + def partial_metadata(self): + """``BaseMetadata`` wrapping ``partial_response``. Lazy to avoid + a circular import with ``dataretrieval.utils`` at module load.""" + from dataretrieval.utils import BaseMetadata + + return BaseMetadata(self.partial_response) + + +def _partial_result_message( + manifest: ChunkManifest, cause: BaseException | None +) -> str: + """Single source of truth for the user-facing failure message. The + ``cause`` branch names the failing sub-request and underlying + exception (most common path); the no-cause branch is the generic + aborted-after-N form used by ``QuotaExhausted``'s pre-emptive bail.""" + tail = ( + " Catch PartialResult to access .partial_frame and resume with " + "resume_from=partial_metadata." + ) + if cause is None: + return ( + f"Chunked request failed after {manifest.completed}/" + f"{manifest.total} sub-requests.{tail}" + ) + return ( + f"Chunked request failed at sub-request {manifest.completed + 1}/" + f"{manifest.total} ({type(cause).__name__}: {cause}).{tail}" + ) + + +class QuotaExhausted(PartialResult): + """Raised mid-chunked-call when the API's reported remaining quota + (``x-ratelimit-remaining`` header) drops below the configured + safety floor. The chunker stops before issuing the next + sub-request to avoid a mid-call HTTP 429 that would silently + truncate paginated results (see PR #273). + + Inherits ``partial_frame``, ``partial_response``, and ``manifest`` + from ``PartialResult``. Adds ``remaining`` (the last observed + header value). + """ + + def __init__( + self, + *, + partial_frame: pd.DataFrame, + partial_response: requests.Response, + manifest: ChunkManifest, + remaining: int, + ) -> None: + super().__init__( + partial_frame=partial_frame, + partial_response=partial_response, + manifest=manifest, + message=( + f"x-ratelimit-remaining dropped to {remaining} after " + f"{manifest.completed}/{manifest.total} chunks; aborting to " + f"avoid mid-call HTTP 429. Catch QuotaExhausted to access " + f".partial_frame and resume with resume_from=partial_metadata " + f"after the hourly window resets." + ), + ) + self.remaining = remaining + + +def _chunkable_params(args: dict[str, Any]) -> dict[str, list[Any]]: + """Return ``{name: list(values)}`` for every list/tuple kwarg with + >1 element that is allowed to chunk.""" + return { + k: list(v) + for k, v in args.items() + if k not in _NEVER_CHUNK and isinstance(v, (list, tuple)) and len(v) > 1 + } + + +def _filter_aware_probe_args(args: dict[str, Any]) -> dict[str, Any]: + """Substitute the filter with a synthetic ASCII clause sized to the + inner chunker's bail floor, so the planner's URL probe matches what + the inner chunker would emit. + + The inner ``filters.chunked`` bails (emits the full filter) when any + single OR-clause's URL-encoded length exceeds the per-sub-request + budget. Mirroring ``filters._effective_filter_budget``, that floor + is ``len(longest_clause) * max(per-clause encoding ratio)``. + Substituting an ASCII clause of that exact length makes + ``quote_plus`` a no-op, so the URL builder sees exactly the + bail-floor byte count. + """ + filter_expr = args.get("filter") + filter_lang = args.get("filter_lang") + if not _is_chunkable(filter_expr, filter_lang): + return args + parts = _split_top_level_or(filter_expr) + if len(parts) < 2: + return args # one-clause filter — inner chunker can't shrink it + longest_raw = max(len(p) for p in parts) + probe_size = math.ceil(longest_raw * _max_per_clause_encoding_ratio(parts)) + return {**args, "filter": "x" * probe_size} + + +def _chunk_bytes(chunk: list[Any]) -> int: + """URL-encoded byte length of ``chunk`` when comma-joined into a + URL parameter value. + + Used as the planner's biggest-chunk comparator in + ``_worst_case_args`` and the halving loop. ``quote_plus`` (rather + than raw ``,``-join length) keeps the comparator faithful to what + the real URL builder produces, so values containing characters + that expand under URL encoding (``%``, ``+``, ``/``, ``&``, …) + can't be mis-ranked. For typical USGS multi-value workloads + (alphanumeric IDs and codes) raw and encoded lengths are equal, + but the encoded form is always correct. + """ + return len(quote_plus(",".join(map(str, chunk)))) + + +def _request_bytes(req: requests.PreparedRequest) -> int: + """Total bytes of a prepared request: URL + body. + + GET routes have ``body=None`` and reduce to URL length. POST routes + (CQL2 JSON body) need body bytes — the URL stays short regardless of + payload, so URL-only sizing would underestimate the request and skip + chunking when it's needed. + + Raises ``TypeError`` on non-sizable bodies (generators, file-like + streams). Size-based planning needs a deterministic byte count; + silently treating an unknown body as zero bytes would under-chunk + and let the request blow past the server's POST-body limit. + """ + url_len = len(req.url) + body = req.body + if body is None: + return url_len + if isinstance(body, (bytes, bytearray)): + return url_len + len(body) + if isinstance(body, str): + return url_len + len(body.encode("utf-8")) + raise TypeError( + f"multi_value_chunked cannot size a request body of type " + f"{type(body).__name__!r}; pass str, bytes, or None. Streaming " + f"bodies (generators, file-like) are not supported because the " + f"planner needs a deterministic byte count up front." + ) + + +def _plan_total(plan: dict[str, list[list[Any]]]) -> int: + """Sub-request count a plan will issue: the cartesian product of + per-dim chunk counts.""" + return math.prod(len(chunks) for chunks in plan.values()) + + +def _worst_case_args( + probe_args: dict[str, Any], plan: dict[str, list[list[Any]]] +) -> dict[str, Any]: + """Args representing the worst-case sub-request the plan will issue: + each dim's largest chunk (by URL-encoded bytes), composed onto + the ``probe_args`` already returned by ``_filter_aware_probe_args`` + so any chunkable filter sits at the inner chunker's bail-floor + size. The planner feeds these args through ``_request_bytes`` to + decide whether the biggest sub-request fits the budget.""" + out = dict(probe_args) + for k, chunks in plan.items(): + out[k] = max(chunks, key=_chunk_bytes) + return out + + +def _plan_chunks( + args: dict[str, Any], + build_request: Callable[..., Any], + url_limit: int, + max_chunks: int | None = None, +) -> dict[str, list[list[Any]]] | None: + """Greedy halving until the worst-case sub-request fits ``url_limit``. + + Budget is total request bytes (URL + body, via ``_request_bytes``) + so POST routes size correctly — see ``multi_value_chunked`` for the + parameter-name caveat. + + Returns ``None`` when no chunking is needed (request as-is fits or + no chunkable lists). Raises ``RequestTooLarge`` when: + - the smallest reducible plan still exceeds ``url_limit`` (every + multi-value param at a singleton chunk and any chunkable filter + already at the inner chunker's bail-floor size), or + - the cartesian-product plan exceeds ``max_chunks`` sub-requests + (the hourly API budget); checked after each split so we bail + promptly once the cap is unreachable. + + ``max_chunks`` defaults to ``_DEFAULT_MAX_CHUNKS`` resolved at call + time, so monkeypatching the module constant takes effect for + direct callers too. + """ + if max_chunks is None: + max_chunks = _DEFAULT_MAX_CHUNKS + if max_chunks < 1: + raise ValueError( + f"max_chunks must be >= 1; got {max_chunks}. Zero or negative " + f"values would silently bypass the cap on the no-chunking path." + ) + chunkable = _chunkable_params(args) + if not chunkable: + return None + probe_args = _filter_aware_probe_args(args) + if _request_bytes(build_request(**probe_args)) <= url_limit: + return None + + plan: dict[str, list[list[Any]]] = {k: [v] for k, v in chunkable.items()} + + while True: + worst = _worst_case_args(probe_args, plan) + if _request_bytes(build_request(**worst)) <= url_limit: + return plan + + # Largest splittable chunk across all dims, by URL-encoded bytes. + splittable = ( + (dim, idx, chunk) + for dim, dim_chunks in plan.items() + for idx, chunk in enumerate(dim_chunks) + if len(chunk) > 1 + ) + biggest = max(splittable, key=lambda t: _chunk_bytes(t[2]), default=None) + if biggest is None: + raise RequestTooLarge( + f"Request exceeds {url_limit} bytes (URL + body) at the " + f"smallest reducible plan: every multi-value parameter " + f"at a singleton chunk and any chunkable filter at the " + f"inner chunker's bail-floor size. Reduce the number " + f"of values, shorten the filter, or split the call " + f"manually." + ) + dim, idx, chunk = biggest + mid = len(chunk) // 2 + plan[dim] = plan[dim][:idx] + [chunk[:mid], chunk[mid:]] + plan[dim][idx + 1 :] + + # Each split only grows the cartesian product, so once we + # cross max_chunks we can never come back under. Bail now + # rather than keep splitting (the URL probe could still take + # many more iterations). + total = _plan_total(plan) + if total > max_chunks: + raise RequestTooLarge( + f"Chunked plan would issue {total} sub-requests, exceeding " + f"max_chunks={max_chunks} (USGS API's default hourly rate " + f"limit per key). Reduce input list sizes, narrow the time " + f"window, or raise max_chunks if you have a higher quota." + ) + + +def _read_remaining(response: requests.Response) -> int: + """Parse ``x-ratelimit-remaining`` from a response. Missing or + malformed header → return ``_QUOTA_UNKNOWN`` so the safety check + treats it as 'plenty of quota' (don't abort on header glitches).""" + raw = response.headers.get(_QUOTA_HEADER) + if raw is None: + return _QUOTA_UNKNOWN + try: + return int(raw) + except (TypeError, ValueError): + return _QUOTA_UNKNOWN + + +def _assemble_partial( + *, + frames: list[pd.DataFrame], + responses: list[requests.Response], + canonical_url: str, + plan_norm: _NormalizedPlan, + completed: int, +) -> tuple[pd.DataFrame, requests.Response, ChunkManifest]: + """Build the partial-state triple shared by every chunker abort path + (quota-exhausted, sub-request error, and the success path's final + response). ``responses`` may be empty (first attempted sub-request + failed); the synthesized response then carries only the canonical + URL so caller-side ``BaseMetadata.chunk_manifest`` still works. + """ + if responses: + partial = _combine_chunk_responses(responses) + else: + partial = requests.Response() + partial.elapsed = datetime.timedelta(0) + partial.url = canonical_url + manifest = ChunkManifest(plan=plan_norm, completed=completed) + partial.chunk_manifest = manifest + partial_frame = _combine_chunk_frames(frames) if frames else pd.DataFrame() + return partial_frame, partial, manifest + + +def _resolve_resume( + resume_from: Any, plan: dict[str, list[list[Any]]] | None +) -> tuple[int, _NormalizedPlan]: + """Validate a ``resume_from`` metadata against a freshly computed + plan and return ``(start_index, normalized_plan)``. + + A resume call is only valid when the freshly chunked plan matches + the saved manifest exactly. Mismatched plans mean the caller's args + changed between the original call and the retry — silently + re-fetching with the new plan would produce a frame that + interleaves data from two incompatible queries. + """ + manifest = getattr(resume_from, "chunk_manifest", None) + if manifest is None: + raise ValueError( + "resume_from has no chunk_manifest. The original call was " + "not chunked (or the metadata is from a different source), " + "so there's nothing to resume." + ) + if plan is None: + raise ValueError( + "resume_from was provided but the current args do not " + "produce a chunk plan — the request fits in one round-trip. " + "Pass the same kwargs as the original call to resume." + ) + fresh = _normalize_plan(plan) + if fresh != manifest.plan: + raise ValueError( + "resume_from manifest does not match the current chunk plan. " + "The kwargs passed to this call would chunk differently than " + "the original. Pass identical kwargs (minus resume_from) to " + "resume; otherwise drop resume_from to issue a fresh query." + ) + if manifest.is_complete: + raise ValueError( + f"resume_from manifest is already complete " + f"({manifest.completed}/{manifest.total} chunks). There is " + f"nothing to resume; drop resume_from." + ) + return manifest.completed, fresh + + +def multi_value_chunked( + *, + build_request: Callable[..., Any], + url_limit: int | None = None, + max_chunks: int | None = None, + quota_safety_floor: int | None = None, +) -> Callable[[_FetchOnce], _FetchOnce]: + """Decorator that splits multi-value list params across sub-requests + so each sub-request fits ``url_limit`` bytes (defaults to + ``filters._WATERDATA_URL_BYTE_LIMIT``) and the cartesian-product + plan stays ≤ ``max_chunks`` sub-requests (defaults to + ``_DEFAULT_MAX_CHUNKS``). All defaults are resolved at call time so + tests/users that patch the module constants affect this decorator + uniformly. + + ``url_limit`` is enforced against total request bytes (URL + body, + via ``_request_bytes``); the name reflects the dominant GET case + where body is empty. POST routes (e.g. ``monitoring-locations`` via + CQL2 JSON) are conservatively sized — never under-chunks, but may + over-chunk at the body's true ceiling. + + Between sub-requests the wrapper reads ``x-ratelimit-remaining`` from + each response. If it drops below ``quota_safety_floor`` (default + ``_DEFAULT_QUOTA_SAFETY_FLOOR``), the wrapper raises ``QuotaExhausted`` + carrying the combined partial result and a ``ChunkManifest`` so + callers can resume after the hourly window resets via + ``resume_from=partial_metadata``, instead of crashing into a + mid-pagination HTTP 429. + + Any other ``Exception`` raised inside a sub-request — transport + errors, mid-pagination ``RuntimeError``, inner-filter + ``RequestTooLarge``, or unexpected exceptions from the wrapped + function — is caught and re-raised as ``PartialResult`` with the + same partial-state payload, with the underlying exception preserved + via ``__cause__``. ``BaseException`` subclasses (``KeyboardInterrupt``, + ``SystemExit``) propagate unchanged. + + Sits OUTSIDE ``@filters.chunked``: list-chunking is the outer loop, + filter-chunking is the inner loop. The wrapped function has the same + signature as ``filters.chunked`` expects — ``(args: dict) -> (frame, + response)`` — so the two decorators compose cleanly. The planner is + filter-aware so it doesn't raise prematurely when the inner filter + chunker would have shrunk the per-sub-request URL on its own. + + Sub-requests run sequentially with no per-call timeout enforced here. + A hung single sub-request will block the entire chunked call; the + caller is responsible for configuring an HTTP-layer timeout (e.g. + via a ``requests.Session`` wrapper) if bounded latency matters. + + Cartesian-product iteration order is deterministic for a given + ``args`` dict: the wrapper iterates ``plan.values()`` in insertion + order (Python 3.7+ guarantee), which equals the order in which + chunkable params appeared in ``args``. For the public waterdata + getters that order is the function-signature order, so + ``ChunkManifest.completed`` maps to the same sub-requests across + repeated calls with the same arguments — resume is well-defined. + + Resume is triggered by passing ``resume_from=partial_metadata`` in + the caller's kwargs. The wrapper pops it before planning (so it + never reaches the underlying HTTP request), validates the saved + plan matches the fresh plan, and skips the already-completed + cartesian-product combinations. See the + ``waterdata-chunking-resume`` user-guide page for a worked + retry-loop example using a one-hour deadline matched to the + API's rate-limit window. + """ + + def decorator(fetch_once: _FetchOnce) -> _FetchOnce: + @functools.wraps(fetch_once) + def wrapper( + args: dict[str, Any], + ) -> tuple[pd.DataFrame, requests.Response]: + limit = ( + url_limit + if url_limit is not None + else filters._WATERDATA_URL_BYTE_LIMIT + ) + floor = ( + quota_safety_floor + if quota_safety_floor is not None + else _DEFAULT_QUOTA_SAFETY_FLOOR + ) + # ``resume_from`` is a chunker-only control kwarg; strip it + # before the planner runs so it can't reach the underlying + # API as a bogus query parameter. + args = dict(args) + resume_from = args.pop(_RESUME_FROM_KEY, None) + + plan = _plan_chunks(args, build_request, limit, max_chunks) + + if resume_from is not None: + start_index, plan_norm = _resolve_resume(resume_from, plan) + else: + start_index = 0 + plan_norm = _normalize_plan(plan) if plan is not None else () + + if plan is None: + return fetch_once(args) + + # Rebuild the canonical original-query URL so the aggregated + # response's ``.url`` reflects the user's request, not the + # first sub-request's sliced view. + canonical_url = build_request(**args).url + + keys = list(plan) + total = _plan_total(plan) + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + combos = itertools.islice( + itertools.product(*(plan[k] for k in keys)), start_index, None + ) + for i, combo in enumerate(combos, start=start_index): + sub_args = {**args, **dict(zip(keys, combo))} + try: + frame, response = fetch_once(sub_args) + except Exception as exc: + partial_frame, partial_response, manifest = _assemble_partial( + frames=frames, + responses=responses, + canonical_url=canonical_url, + plan_norm=plan_norm, + completed=i, + ) + raise PartialResult( + partial_frame=partial_frame, + partial_response=partial_response, + manifest=manifest, + cause=exc, + ) from exc + frames.append(frame) + responses.append(response) + # Skip the quota check after the last sub-request — + # nothing left to abort. + if i < total - 1: + remaining = _read_remaining(response) + if remaining < floor: + partial_frame, partial_response, manifest = _assemble_partial( + frames=frames, + responses=responses, + canonical_url=canonical_url, + plan_norm=plan_norm, + completed=i + 1, + ) + raise QuotaExhausted( + partial_frame=partial_frame, + partial_response=partial_response, + manifest=manifest, + remaining=remaining, + ) + + combined_frame, combined, _ = _assemble_partial( + frames=frames, + responses=responses, + canonical_url=canonical_url, + plan_norm=plan_norm, + completed=total, + ) + return combined_frame, combined + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py index 4c136b82..deac263e 100644 --- a/dataretrieval/waterdata/filters.py +++ b/dataretrieval/waterdata/filters.py @@ -152,6 +152,18 @@ def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: return chunks +def _max_per_clause_encoding_ratio(parts: list[str]) -> float: + """Worst per-clause ``len(quote_plus(p)) / len(p)`` across OR-clauses. + + Any sub-request chunk could end up containing only the heavier-encoding + clauses, so per-sub-request byte budgets must be sized against the + worst (not average) ratio to avoid overflow. Used by both this + module's filter chunker and the outer ``chunking._filter_aware_probe_args``; + pinning the formula here keeps the two from drifting. + """ + return max(len(quote_plus(p)) / len(p) for p in parts) + + def _effective_filter_budget( args: dict[str, Any], filter_expr: str, @@ -163,8 +175,7 @@ def _effective_filter_budget( non-filter URL bytes by building the request with a 1-byte placeholder filter, subtract from the URL limit to get the bytes available for the encoded filter, then convert back to raw CQL bytes via the *maximum* - per-clause encoding ratio (a chunk could contain only the heavier-encoding - clauses, so budgeting by the average ratio could overflow). + per-clause encoding ratio. """ # Fast path: encoded filter clearly fits with room for any plausible # non-filter URL. Skips the PreparedRequest build and splitter scan. @@ -179,7 +190,7 @@ def _effective_filter_budget( # the caller sees one 414 instead of N parallel sub-request failures. return len(filter_expr) + 1 parts = _split_top_level_or(filter_expr) or [filter_expr] - encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts) + encoding_ratio = _max_per_clause_encoding_ratio(parts) return max(100, int(available_url_bytes / encoding_ratio)) @@ -268,13 +279,24 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: def _combine_chunk_responses( responses: list[requests.Response], ) -> requests.Response: - """Return one response: first chunk's URL/headers + summed ``elapsed``. - - Mutates the first response in place (only ``elapsed``); downstream only - reads ``elapsed`` (in ``BaseMetadata.query_time``), URL, and headers. + """Return one response with the last chunk's headers (for current + rate-limit state) and summed ``elapsed`` (for total wall-clock). + + The returned response's ``.url`` is the *first chunk's* URL, which + only reflects the first slice of the user's query. Callers wanting + the canonical original-query URL on ``BaseMetadata`` must overwrite + ``.url`` themselves (using ``build_request(**original_args).url``); + the decorator wrappers in ``filters.chunked`` and + ``chunking.multi_value_chunked`` do this. + + Mutates the first response in place: ``.headers`` is replaced with + the last response's headers and ``.elapsed`` is accumulated across + all chunks. Downstream reads ``.url``, ``.headers``, and + ``.elapsed`` (via ``BaseMetadata``). """ head = responses[0] if len(responses) > 1: + head.headers = responses[-1].headers head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed) return head @@ -295,8 +317,12 @@ def chunked(*, build_request: Callable[..., Any]) -> Callable[[_FetchOnce], _Fet - Chunkable cql-text filter: run the lexicographic-pitfall guard, split into URL-length-safe sub-expressions, call the wrapped function once per chunk, concatenate frames (drop empties, dedup by feature ``id``), - and return an aggregated response (first chunk's URL/headers, summed - ``elapsed``). + and return an aggregated response with ``.url`` restored to the + canonical full-filter URL (so ``BaseMetadata.url`` reflects the + user's original query rather than the first filter chunk), last + chunk's headers (so callers see current ``x-ratelimit-remaining``, + which the outer ``multi_value_chunked`` decorator's ``QuotaExhausted`` + guard depends on), and summed ``elapsed``. Either way the return shape matches the undecorated function's, so the caller wraps the response in ``BaseMetadata`` the same way in both paths. @@ -327,7 +353,12 @@ def wrapper( frames.append(frame) responses.append(response) - return _combine_chunk_frames(frames), _combine_chunk_responses(responses) + # Restore the canonical URL representing the user's full filter + # (the aggregated response otherwise carries only the first + # filter-chunk's URL, which misleads callers logging md.url). + combined = _combine_chunk_responses(responses) + combined.url = build_request(**args).url + return _combine_chunk_frames(frames), combined return wrapper # type: ignore[return-value] diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 9245bb92..fe419100 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -5,7 +5,7 @@ import os import re from collections.abc import Iterable, Mapping -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, get_args from zoneinfo import ZoneInfo @@ -14,7 +14,7 @@ from dataretrieval import __version__ from dataretrieval.utils import BaseMetadata -from dataretrieval.waterdata import filters +from dataretrieval.waterdata import chunking, filters from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -644,6 +644,26 @@ def _get_resp_data(resp: requests.Response, geopd: bool) -> pd.DataFrame: return df +def _finalize_paginated_response( + initial: requests.Response, + last: requests.Response, + total_elapsed: timedelta, +) -> None: + """Carry the last page's headers + cumulative elapsed onto the initial + response in place. + + The initial response stays canonical for ``md.url`` (user's original + query), but its ``.headers`` and ``.elapsed`` are overwritten so the + multi-value chunker's ``QuotaExhausted`` guard sees current + ``x-ratelimit-remaining`` and ``md.query_time`` reflects total + wall-clock across pages. No-op when ``initial is last`` (single page). + """ + if last is initial: + return + initial.headers = last.headers + initial.elapsed = total_elapsed + + def _walk_pages( geopd: bool, req: requests.PreparedRequest, @@ -669,7 +689,11 @@ def _walk_pages( pd.DataFrame A DataFrame containing the aggregated results from all pages. requests.Response - The initial response object containing metadata about the first request. + Aggregated response: the initial request's URL (for query + identity), the final page's headers (so downstream callers see + current rate-limit state, which the multi-value chunker's + ``QuotaExhausted`` guard relies on), and cumulative ``elapsed`` + summed across every page. Raises ------ @@ -700,9 +724,11 @@ def _walk_pages( try: resp = client.send(req) _raise_for_non_200(resp) - - # Store the initial response for metadata + # Keep the original-request response as the "canonical" one for + # ``md.url`` reproducibility; ``.headers`` and ``.elapsed`` get + # overwritten with latest/cumulative values below. initial_response = resp + total_elapsed = resp.elapsed # Grab some aspects of the original request: headers and the # request type (GET or POST) @@ -723,6 +749,7 @@ def _walk_pages( ) _raise_for_non_200(resp) dfs.append(_get_resp_data(resp, geopd=geopd)) + total_elapsed += resp.elapsed curr_url = _next_req_url(resp) except Exception as e: # noqa: BLE001 logger.warning( @@ -730,6 +757,8 @@ def _walk_pages( ) raise RuntimeError(_paginated_failure_message(len(dfs), e)) from e + _finalize_paginated_response(initial_response, resp, total_elapsed) + # Concatenate all pages at once for efficiency return pd.concat(dfs, ignore_index=True), initial_response finally: @@ -957,17 +986,20 @@ def get_ogc_data( return return_list, BaseMetadata(response) +@chunking.multi_value_chunked(build_request=_construct_api_requests) @filters.chunked(build_request=_construct_api_requests) def _fetch_once( args: dict[str, Any], ) -> tuple[pd.DataFrame, requests.Response]: """Send one prepared-args OGC request; return the frame + response. - Filter chunking is added orthogonally by the ``@filters.chunked`` - decorator: with no filter (or an un-chunkable one) the decorator - passes ``args`` through to this body; with a chunkable filter it - fans out and calls this body once per sub-filter, then combines. - Either way the return shape is ``(frame, response)``. + Two orthogonal chunkers wrap this body. ``@chunking.multi_value_chunked`` + (outer) splits multi-value list params (e.g. ``monitoring_location_id``) + across sub-requests so each URL fits the server byte limit; the + cartesian product of per-dim chunks is iterated. ``@filters.chunked`` + (inner) splits long cql-text filters at top-level ``OR``. With no + chunkable inputs both pass through unchanged. Either way the return + shape is ``(frame, response)``. """ req = _construct_api_requests(**args) return _walk_pages(geopd=GEOPANDAS, req=req) @@ -1158,9 +1190,11 @@ def get_stats_data( try: resp = client.send(req) _raise_for_non_200(resp) - - # Store the initial response for metadata + # Keep the original-request response as the "canonical" one for + # ``md.url`` reproducibility; ``.headers`` and ``.elapsed`` get + # overwritten with latest/cumulative values below. initial_response = resp + total_elapsed = resp.elapsed # Grab some aspects of the original request: headers and the # request type (GET or POST) @@ -1186,6 +1220,7 @@ def get_stats_data( _raise_for_non_200(resp) body = resp.json() all_dfs.append(_handle_stats_nesting(body, geopd=GEOPANDAS)) + total_elapsed += resp.elapsed next_token = body["next"] except Exception as e: # noqa: BLE001 logger.warning( @@ -1196,6 +1231,8 @@ def get_stats_data( ) raise RuntimeError(_paginated_failure_message(len(all_dfs), e)) from e + _finalize_paginated_response(initial_response, resp, total_elapsed) + dfs = pd.concat(all_dfs, ignore_index=True) if len(all_dfs) > 1 else all_dfs[0] # . If expand percentiles is True, make each percentile diff --git a/docs/source/userguide/index.rst b/docs/source/userguide/index.rst index 2ba5a93a..a9887f5d 100644 --- a/docs/source/userguide/index.rst +++ b/docs/source/userguide/index.rst @@ -15,3 +15,4 @@ Contents timeconventions dataportals + waterdata_chunking diff --git a/docs/source/userguide/waterdata_chunking.rst b/docs/source/userguide/waterdata_chunking.rst new file mode 100644 index 00000000..ea326982 --- /dev/null +++ b/docs/source/userguide/waterdata_chunking.rst @@ -0,0 +1,135 @@ +.. _waterdata-chunking-resume: + +Chunked Queries and Resuming After Failure +------------------------------------------ + +The OGC ``waterdata`` getters (``get_daily``, ``get_continuous``, +``get_field_measurements``, and the other multi-value-capable +functions) transparently split requests whose URLs would otherwise +exceed the USGS Water Data API's ~8 KB byte limit. A heavy chained +query — e.g. *"pull every stream site in Ohio, then their daily +discharge for the last week"* — fans out into many sub-requests under +the hood and returns one combined DataFrame. + +Long-running chunked calls can fail partway through. Two common +causes: + +- **Quota exhaustion.** The API rate-limits each HTTP request + (including pagination). The chunker monitors the + ``x-ratelimit-remaining`` header between sub-requests and aborts + before issuing the next one if the budget drops below the safety + floor. +- **Transient upstream errors.** A single sub-request can hit a 5xx, + a network blip, or a mid-pagination 429. + +Both cases raise :class:`~dataretrieval.waterdata.chunking.PartialResult` +(the quota case raises its subclass +:class:`~dataretrieval.waterdata.chunking.QuotaExhausted`). The +exception carries the combined partial DataFrame and a +:class:`~dataretrieval.waterdata.chunking.ChunkManifest` that records +how many sub-requests of the cartesian-product plan completed. + +The same getter accepts the partial metadata back via a +``resume_from=`` kwarg. The chunker validates that the freshly-planned +chunk layout matches the saved manifest, then issues only the +outstanding sub-requests. + +The Resume Pattern +****************** + +The canonical idiom: a loop that retries on ``PartialResult``, +accumulates each call's partial DataFrame, and threads the latest +metadata back into the next attempt as ``resume_from=``. The USGS API +rate-limit window is one hour, so a total retry deadline of one hour +is a sensible ceiling — anything longer means the failure is +structural, not transient, and the loop should surface the error +rather than spin forever. + +.. code:: python + + import time + import pandas as pd + from dataretrieval import waterdata + from dataretrieval.waterdata import PartialResult + + sites_df, _ = waterdata.get_monitoring_locations( + state_name="Ohio", + site_type="Stream", + ) + sites = sites_df["monitoring_location_id"].tolist() + + deadline = time.monotonic() + 3600 # one-hour cap + partials = [] + md = None # carries the latest chunk_manifest between attempts + attempt = 0 + + while True: + try: + df, md = waterdata.get_daily( + monitoring_location_id=sites, + parameter_code="00060", + time="P7D", + resume_from=md, # None on the first attempt + ) + break # full result fetched + except PartialResult as exc: + partials.append(exc.partial_frame) + md = exc.partial_metadata + if time.monotonic() >= deadline: + raise TimeoutError( + f"Could not complete chunked query within one hour " + f"({md.chunk_manifest.completed}/" + f"{md.chunk_manifest.total} chunks done)." + ) from exc + attempt += 1 + # Exponential backoff capped at 10 minutes. Quota-reset + # failures benefit from a longer wait; transient transport + # errors clear quickly. The outer deadline still bounds total + # wait time at one hour. + time.sleep(min(60 * 2 ** (attempt - 1), 600)) + + # Each partial frame plus the final ``df`` is disjoint, so a single + # ``concat`` reconstructs what a successful one-shot call would have + # returned. + full = pd.concat([*partials, df], ignore_index=True) + +How Resume Validates the Plan +***************************** + +``ChunkManifest`` pins the *normalized cartesian-product plan*, not +just the input kwargs. If a caller changes their inputs between the +original failure and the retry — even in ways that look equivalent — +the freshly-computed plan would differ from the saved one, and +silently re-fetching would interleave data from two incompatible +queries. The chunker raises ``ValueError`` instead, with one of four +explicit messages: + +- ``"resume_from has no chunk_manifest"`` — the metadata is from a + call that wasn't chunked (or from a different source entirely). +- ``"do not produce a chunk plan"`` — the current args fit in one + round-trip, so there is no plan to skip against. +- ``"manifest does not match the current chunk plan"`` — the input + list changed between calls. +- ``"already complete"`` — the saved manifest is fully consumed; + drop ``resume_from``. + +Inspecting the Manifest on Success +********************************** + +The manifest is also attached to ``BaseMetadata.chunk_manifest`` on +successful chunked calls, so callers can log fan-out information +without catching anything: + +.. code:: python + + df, md = waterdata.get_daily( + monitoring_location_id=sites, + parameter_code="00060", + time="P7D", + ) + if md.chunk_manifest is not None: + m = md.chunk_manifest + logger.info("query fanned out across %d sub-requests", m.total) + +For calls that did not need chunking, ``md.chunk_manifest`` is +``None``. diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 18e78594..ee1b97ec 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -1,7 +1,9 @@ import datetime import json +import math import sys from unittest import mock +from urllib.parse import quote_plus import pandas as pd import pytest @@ -10,6 +12,8 @@ if sys.version_info < (3, 10): pytest.skip("Skip entire module on Python < 3.10", allow_module_level=True) +from dataretrieval.waterdata import chunking as _chunking +from dataretrieval.waterdata import filters as _filters from dataretrieval.waterdata import ( get_channel, get_combined_metadata, @@ -28,6 +32,21 @@ get_stats_por, get_time_series_metadata, ) +from dataretrieval.waterdata.chunking import ( + _DEFAULT_MAX_CHUNKS, + _DEFAULT_QUOTA_SAFETY_FLOOR, + _QUOTA_HEADER, + ChunkManifest, + PartialResult, + QuotaExhausted, + RequestTooLarge, + _chunkable_params, + _filter_aware_probe_args, + _normalize_plan, + _plan_chunks, + _read_remaining, + multi_value_chunked, +) from dataretrieval.waterdata.utils import ( _check_monitoring_location_id, _check_profiles, @@ -207,6 +226,932 @@ def test_construct_api_requests_two_element_date_list_becomes_interval(): assert "time=2024-01-01%2F2024-01-31" in req.url +# ----- Multi-value GET-parameter chunker (chunking.py) ---------------------- +# +# These tests exercise the planner with a fake ``build_request`` whose URL +# byte length is a deterministic function of its inputs. Tests below model: +# - non-chunkable args contribute ``base_bytes``, +# - every multi-value list contributes ``len(",".join(map(str, v)))``, +# - the ``filter`` kwarg contributes ``len(filter)``. +# This isolates planner behaviour from the real HTTP request builder. + + +class _FakeReq: + __slots__ = ("url", "body") + + def __init__(self, url, body=None): + self.url = url + self.body = body + + +def _fake_build(*, base=200, **kwargs): + """Fake build_request: URL length deterministic in its inputs. + + Mirrors the GET-routed shape: payload goes in the URL, body is None. + List/string values are URL-encoded via ``quote_plus`` so the fake's + byte count matches what the real ``_construct_api_requests`` would + produce; otherwise an alphanumeric test could pass against the fake + but fail in production once values containing ``%``, ``+``, ``/``, + ``&`` etc. (which expand under encoding) reach the same code path. + """ + bytes_ = base + for v in kwargs.values(): + if isinstance(v, (list, tuple)): + bytes_ += len(quote_plus(",".join(map(str, v)))) + elif isinstance(v, str): + bytes_ += len(quote_plus(v)) + return _FakeReq("x" * bytes_) + + +def test_filter_aware_probe_args_passes_through_when_not_chunkable(): + """No filter, json-lang filter, single-clause filter — return unchanged.""" + assert _filter_aware_probe_args({"a": 1}) == {"a": 1} + assert _filter_aware_probe_args({"filter": "a='1'", "filter_lang": "cql-json"}) == { + "filter": "a='1'", + "filter_lang": "cql-json", + } + args = {"filter": "a='single clause with no OR'"} + assert _filter_aware_probe_args(args) == args + + +def test_filter_aware_probe_args_models_inner_chunker_bail_floor(): + """Chunkable filter → return args with filter replaced by a synthetic + ASCII clause whose URL byte count equals the inner chunker's bail + floor ``len(longest) * max(per_clause_encoding_ratio)``. Mirrors + ``filters._effective_filter_budget``'s worst-case model so the + planner doesn't approve plans the inner chunker would refuse.""" + args = {"filter": "a='1' OR a='22' OR a='333'", "x": 7} + probe = _filter_aware_probe_args(args) + parts = ["a='1'", "a='22'", "a='333'"] + expected = math.ceil( + max(len(p) for p in parts) * max(len(quote_plus(p)) / len(p) for p in parts) + ) + assert len(probe["filter"]) == expected + assert probe["filter"].isascii() and probe["filter"].isalnum() + assert probe["x"] == 7 + assert args["filter"] == "a='1' OR a='22' OR a='333'" # input not mutated + + +def test_plan_chunks_returns_none_when_request_fits(): + """URL under limit → planner returns None, decorator passes through.""" + args = {"monitoring_location_id": ["A", "B", "C"]} + plan = _plan_chunks(args, _fake_build, url_limit=8000) + assert plan is None + + +def test_plan_chunks_returns_none_when_no_chunkable_lists(): + """No multi-value lists, however over-limit → planner can't help, returns None + (decorator falls through; server may 414 but that's not chunker's job).""" + args = {"monitoring_location_id": "scalar-only"} + plan = _plan_chunks(args, _fake_build, url_limit=10) + assert plan is None + + +def test_plan_chunks_greedy_halving_targets_largest_dim(): + """Two dims with one much larger — the heavy dim halves first.""" + args = { + "monitoring_location_id": ["X" * 30, "Y" * 30, "Z" * 30, "W" * 30], + "parameter_code": ["00060", "00065"], + } + # full URL ≈ 200 + 123 + 12 = 335; force splitting heavy dim only. + plan = _plan_chunks(args, _fake_build, url_limit=310) + assert len(plan["monitoring_location_id"]) > 1 + assert len(plan["parameter_code"]) == 1 # heavy-dim split was enough + + +def test_plan_chunks_raises_request_too_large_at_singleton_floor(): + """Limit below singleton-per-dim floor (with no chunkable filter to + fall back on) → RequestTooLarge with a clear message.""" + args = {"monitoring_location_id": ["A", "B"]} + # base=200 alone exceeds limit; no relief possible. + with pytest.raises(RequestTooLarge, match="multi-value parameter"): + _plan_chunks(args, _fake_build, url_limit=100) + + +def test_plan_chunks_coordinates_with_filter_chunker(monkeypatch): + """COORDINATION REGRESSION TEST. + + With the FULL filter in URL-length probes, singleton-per-dim URL still + exceeds the limit and the planner would raise RequestTooLarge. With + filter-aware probing, the planner models the per-sub-request URL as + ``worst-dim-chunk + longest-clause-after-encoding`` (the inner filter + chunker's bail floor — it returns the FULL filter if any single + clause exceeds the budget, so the longest clause is the smallest + floor it can guarantee). The probe fits, plan returns. + + Sanity-check the *negative*: with filter-aware probing disabled, the + same inputs would raise. + """ + clauses = [f"f='{i}'" for i in range(10)] + args = { + "monitoring_location_id": ["A" * 10, "B" * 10, "C" * 10, "D" * 10], + "filter": " OR ".join(clauses), + } + # singleton+full-filter ≈ 200 + 10 + 86 = 296 (over limit 240) — would raise. + # longest-clause probe model ≈ 200 + 10 + 5 = 215 (under limit) — plan succeeds. + # (Here all clauses are the same length, so longest == shortest; the + # encoding-ratio coordination matters for lopsided clauses.) + plan = _plan_chunks(args, _fake_build, url_limit=240) + assert plan is not None # coordination prevented the premature raise + assert len(plan["monitoring_location_id"]) > 1 # planner did split + + # Negative control: patch the probe helper to be a no-op (model "no + # filter awareness") and confirm the same inputs raise. + monkeypatch.setattr(_chunking, "_filter_aware_probe_args", lambda a: a) + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=240) + + +def test_plan_chunks_probes_with_max_clause_not_min(): + """Regression: with lopsided OR-clauses (one short, one long), probing + with min(parts) lets the planner falsely declare a plan feasible that + the inner filter chunker can't actually deliver — it bails when any + single clause exceeds the per-sub-request budget. Probing with the + longest clause is the safe lower bound on per-sub-request filter + size, so the planner correctly raises when no plan can fit.""" + args = { + "sites": ["A" * 10, "B" * 10], + "filter": "x='1' OR x='" + "a" * 28 + "'", # 5-char and 33-char clauses + } + # base 200 + singleton sites 10 + min-clause 5 = 215 (limit 230 -> fits) + # base 200 + singleton sites 10 + max-clause 33 = 243 (limit 230 -> exceeds) + # With min: planner succeeds, but real URL with full filter (42) exceeds + # 230 -> server 414. With max: planner raises early, as it should. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=230) + + +def test_plan_chunks_still_raises_when_even_min_clause_doesnt_fit(): + """If the limit is so tight that singleton + shortest-clause STILL + exceeds it, filter chunker can't save us either — raise.""" + args = { + "monitoring_location_id": ["A" * 10, "B" * 10], + "filter": "x='12345' OR x='67890'", # min clause is 9 chars + } + # Singleton + min-clause ≈ 200 + 10 + 9 = 219; limit below that → unrecoverable. + with pytest.raises(RequestTooLarge): + _plan_chunks(args, _fake_build, url_limit=210) + + +def test_multi_value_chunked_passes_through_when_url_fits(): + """No planning needed → decorator calls underlying function exactly once + with the original args.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch({"monitoring_location_id": ["A", "B"]}) + assert len(calls) == 1 + assert calls[0]["monitoring_location_id"] == ["A", "B"] + + +def test_multi_value_chunked_emits_cartesian_product(): + """Two chunkable dims, each split into 2 chunks → exactly 4 sub-calls, + each pairing one chunk from each dim.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append({k: v for k, v in args.items() if k in ("sites", "pcodes")}) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10], + "pcodes": ["P1" * 10, "P2" * 10, "P3" * 10, "P4" * 10], + } + ) + # Both heavy → planner should split both dims. Confirm a cartesian shape: + # every unique site-chunk pairs with every unique pcode-chunk. + sites_seen = {tuple(c["sites"]) for c in calls} + pcodes_seen = {tuple(c["pcodes"]) for c in calls} + assert len(calls) == len(sites_seen) * len(pcodes_seen) + assert len(sites_seen) > 1 + assert len(pcodes_seen) > 1 + + +def test_multi_value_chunked_emits_3d_cartesian_product(): + """Three chunkable dims, each forced to split → exhaustive cartesian + product across all three. Verifies the planner's halving loop handles + N>2 dims uniformly and the wrapper's ``itertools.product`` enumerates + every combination exactly once.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + calls.append(tuple(tuple(args[k]) for k in ("sites", "pcodes", "stats"))) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + fetch( + { + "sites": ["S" * 12 + str(i) for i in range(4)], + "pcodes": ["P" * 12 + str(i) for i in range(4)], + "stats": ["T" * 12 + str(i) for i in range(4)], + } + ) + + # Three independent axes — every (site_chunk, pcode_chunk, stat_chunk) + # triple must appear exactly once. Confirm: + sites_seen = {c[0] for c in calls} + pcodes_seen = {c[1] for c in calls} + stats_seen = {c[2] for c in calls} + + assert len(sites_seen) > 1, "sites dim was not split" + assert len(pcodes_seen) > 1, "pcodes dim was not split" + assert len(stats_seen) > 1, "stats dim was not split" + + # Cartesian shape: # sub-calls == product of unique chunks across dims + expected = len(sites_seen) * len(pcodes_seen) * len(stats_seen) + assert len(calls) == expected, ( + f"expected {expected} cartesian-product sub-calls, got {len(calls)}" + ) + # And no triple repeats (exhaustive enumeration, no duplicates). + assert len(set(calls)) == len(calls) + # The chunked values, when unioned across calls, recover the original list. + assert {x for tup in sites_seen for x in tup} == { + "S" * 12 + str(i) for i in range(4) + } + assert {x for tup in pcodes_seen for x in tup} == { + "P" * 12 + str(i) for i in range(4) + } + assert {x for tup in stats_seen for x in tup} == { + "T" * 12 + str(i) for i in range(4) + } + + +def test_multi_value_chunked_lazy_url_limit(monkeypatch): + """``url_limit=None`` → resolve filters._WATERDATA_URL_BYTE_LIMIT at call + time, so tests that patch the constant affect this decorator too.""" + calls = [] + + @multi_value_chunked(build_request=_fake_build) # url_limit defaults to None + def fetch(args): + calls.append(args) + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + monkeypatch.setattr(_filters, "_WATERDATA_URL_BYTE_LIMIT", 240) + # 4 sites of 10 chars → exceeds 240 → planner splits. + fetch({"sites": ["S" * 10 + str(i) for i in range(4)]}) + assert len(calls) > 1, "patched constant should drive chunking" + + +def test_default_max_chunks_matches_hourly_api_quota(): + """The default cap mirrors the USGS Water Data API's documented + per-API-key hourly limit. Locking this in so future changes have to + explicitly acknowledge the quota.""" + assert _DEFAULT_MAX_CHUNKS == 1000 + + +def test_plan_chunks_raises_when_plan_exceeds_max_chunks(): + """A converged plan with more sub-requests than ``max_chunks`` must + raise rather than silently issue them and burn the user's API quota.""" + # 2 dims with long values, each needing many singleton-ish chunks. + # Pick chunk sizes that converge to a plan exceeding a tight cap. + args = { + "dim_a": [f"long-string-value-{i}" for i in range(50)], + "dim_b": [f"another-long-value-{i}" for i in range(50)], + } + # url_limit forces splitting; max_chunks=10 forces the cap to fire. + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + _plan_chunks(args, _fake_build, url_limit=250, max_chunks=10) + + +def test_plan_chunks_respects_default_cap_without_explicit_arg(): + """Default kwarg path: ``max_chunks`` defaults to _DEFAULT_MAX_CHUNKS + when not specified, so direct callers (e.g., other library code) get + the same safety net as the decorator wrapper.""" + args = { + "dim_a": [f"v{i:03d}" for i in range(60)], + "dim_b": [f"v{i:03d}" for i in range(60)], + "dim_c": [f"v{i:03d}" for i in range(60)], + } + # Without explicit max_chunks: defaults to 1000. The plan for these + # inputs would emit > 1000 sub-requests at a tight limit, so should + # raise on default cap alone. + with pytest.raises(RequestTooLarge, match=r"max_chunks=1000"): + _plan_chunks(args, _fake_build, url_limit=220) + + +def test_multi_value_chunked_cap_override(): + """A decorator-time ``max_chunks`` override lets callers with higher + quotas raise the ceiling without monkeypatching the module constant.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=220, max_chunks=10) + def fetch(args): + return pd.DataFrame(), mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + + with pytest.raises(RequestTooLarge, match="exceeding max_chunks=10"): + fetch( + { + "dim_a": [f"longer-v{i}" for i in range(30)], + "dim_b": [f"longer-v{i}" for i in range(30)], + } + ) + + +def _quota_response(remaining: int | str | None) -> mock.Mock: + """A mock requests.Response-like object whose ``x-ratelimit-remaining`` + header reflects the given value (None → header absent).""" + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = {} if remaining is None else {_QUOTA_HEADER: str(remaining)} + return resp + + +def test_read_remaining_parses_header(): + assert _read_remaining(_quota_response(42)) == 42 + + +def test_read_remaining_treats_missing_header_as_plenty(): + """Servers that don't echo a rate-limit header must not trigger + spurious QuotaExhausted aborts. Sentinel is a large integer so any + plausible safety floor compares cleanly.""" + assert _read_remaining(_quota_response(None)) >= 1_000_000 + + +def test_read_remaining_treats_malformed_header_as_plenty(): + """Defensive: non-integer header value → don't abort.""" + assert _read_remaining(_quota_response("not-a-number")) >= 1_000_000 + + +def test_default_quota_safety_floor(): + """Default floor lives at 50 — enough headroom for one final + chunked call's pagination spike without breaching the hourly cap.""" + assert _DEFAULT_QUOTA_SAFETY_FLOOR == 50 + + +def test_multi_value_chunked_aborts_when_quota_floor_breached(): + """Mid-call, when ``x-ratelimit-remaining`` drops below the floor, + the chunker must raise ``QuotaExhausted`` *before* issuing the next + sub-request — and the exception must carry the partial frame plus + the chunk offset so callers can resume.""" + # Build a fetch_once whose response 'remaining' header decrements + # through 200, 100, 40 (below floor=50), 10. + remaining_seq = iter([200, 100, 40, 10]) + page_idx = iter(range(10)) + + def fetch(args): + idx = next(page_idx) + return ( + pd.DataFrame( + {"site": list(args["sites"]), "page": [idx] * len(args["sites"])} + ), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + # Plan forces 4 sub-requests (4 singleton site chunks). + with pytest.raises(QuotaExhausted) as excinfo: + decorated({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + # Aborted after the 3rd sub-request (remaining=40 < floor=50). + assert err.manifest.completed == 3 + assert err.manifest.total == 4 + assert err.remaining == 40 + # Partial frame combines rows from the first three completed sub-requests. + assert err.partial_frame is not None + assert set(err.partial_frame["page"]) == {0, 1, 2} + # The partial_response carries the manifest so callers can resume. + assert err.partial_response.chunk_manifest is err.manifest + assert err.manifest.is_complete is False + assert err.manifest.remaining == 1 + + +def test_multi_value_chunked_does_not_abort_on_last_chunk(): + """Aborting on the final sub-request would be pointless — there's + no 'next' to protect. The check is skipped there. Earlier chunks + stay above the floor; only the last drops below, and we still + return cleanly because the check is skipped at i == total-1.""" + remaining_seq = iter([500, 5]) # only the LAST chunk dips below floor=50 + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=50, + )(fetch) + + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) # forces 2 chunks + assert len(df) == 2 # no raise — both chunks ran + + +def test_multi_value_chunked_quota_check_disabled_with_zero_floor(): + """Setting the floor to 0 effectively disables the quota guard — + counter can go to 1 without aborting (since 1 > 0 = floor).""" + remaining_seq = iter([5, 1]) + + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(next(remaining_seq)), + ) + + decorated = multi_value_chunked( + build_request=_fake_build, + url_limit=240, + quota_safety_floor=0, + )(fetch) + df, _ = decorated({"sites": ["S1" * 10, "S2" * 10]}) + assert len(df) == 2 # no raise + + +def test_quota_exhausted_message_includes_resume_offset(): + """The error message must point the user at the chunk offset to + resume from, otherwise the partial_frame attribute is a footgun + — the user has no way to know which chunks still need re-issuing.""" + manifest = ChunkManifest( + plan=(("sites", tuple(("s" + str(i),) for i in range(20))),), + completed=7, + ) + e = QuotaExhausted( + partial_frame=pd.DataFrame(), + partial_response=mock.Mock(), + manifest=manifest, + remaining=12, + ) + msg = str(e) + assert "7/20" in msg + assert "12" in msg + assert "QuotaExhausted" in msg or "resume" in msg + + +def test_request_bytes_rejects_non_sizable_body(): + """``_request_bytes`` requires a deterministic byte count up front; + silently treating an unknown body as zero would under-chunk and let + the request blow past the server's POST-body limit. Generators, + iterables, and file-like objects must surface as ``TypeError``.""" + from dataretrieval.waterdata.chunking import _request_bytes + + class _FakeReqWithGenBody: + url = "https://example.com/foo" + body = (b"x" for _ in range(3)) + + with pytest.raises(TypeError, match="cannot size a request body"): + _request_bytes(_FakeReqWithGenBody()) + + +def test_request_bytes_handles_supported_body_types(): + """Sanity-check the supported body types: None (GET), bytes (raw + POST), str (JSON-as-string POST).""" + from dataretrieval.waterdata.chunking import _request_bytes + + class _Req: + def __init__(self, url, body): + self.url = url + self.body = body + + assert _request_bytes(_Req("ab", None)) == 2 + assert _request_bytes(_Req("ab", b"cd")) == 4 + assert _request_bytes(_Req("ab", "cd")) == 4 + assert _request_bytes(_Req("ab", bytearray(b"cd"))) == 4 + + +def test_plan_chunks_rejects_non_positive_max_chunks(): + """``max_chunks < 1`` is meaningless and would silently bypass the + cap on the no-chunking-needed path (initial plan total = 1 and the + in-loop check only runs after a split). Reject early.""" + args = {"monitoring_location_id": ["A", "B", "C", "D"]} + with pytest.raises(ValueError, match="max_chunks must be >= 1"): + _plan_chunks(args, _fake_build, url_limit=1000, max_chunks=0) + with pytest.raises(ValueError, match="max_chunks must be >= 1"): + _plan_chunks(args, _fake_build, url_limit=1000, max_chunks=-5) + + +def test_multi_value_chunked_restores_canonical_url(): + """When chunking fans out, the aggregated response's ``.url`` must + reflect the *user's original* query (rebuilt from the unchunked + args), not the first chunk's URL. Callers logging ``md.url`` for + reproducibility need the full query.""" + sites = ["S" * 10 + str(i) for i in range(4)] + sub_urls: list[str] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + # Each sub-response carries the chunked sub_args's URL, so + # without canonical restoration the first chunk's URL would + # leak through to md.url. + sub_url = _fake_build(**args).url + sub_urls.append(sub_url) + resp = mock.Mock(elapsed=datetime.timedelta(seconds=0.1)) + resp.headers = {} + resp.url = sub_url + return pd.DataFrame(), resp + + _df, md = fetch({"sites": sites}) + + assert len(sub_urls) > 1, "test setup error: chunker didn't fan out" + # md.url must equal the URL the unchunked query would have produced. + assert md.url == _fake_build(sites=sites).url + # And differ from every sub-request's URL (each carries a smaller list). + assert all(md.url != u for u in sub_urls) + # The canonical URL is strictly bigger byte-wise than any sub-request. + assert all(len(md.url) > len(u) for u in sub_urls) + + +def test_chunkable_params_skips_filter_passed_as_list(): + """Defensive guard: ``filter`` is documented as a string. If a caller + mistakenly passes it as a list, the chunker must NOT treat it as a + multi-value dim — comma-joining CQL clauses inside the URL would + produce a malformed filter expression. The inner ``filters.chunked`` + is the only place that may shrink ``filter``.""" + args = { + "monitoring_location_id": ["USGS-A", "USGS-B"], + "filter": ["a='1'", "a='2'"], # malformed input + "filter_lang": ["cql-text", "cql-json"], # ditto + } + chunkable = _chunkable_params(args) + assert "monitoring_location_id" in chunkable + assert "filter" not in chunkable + assert "filter_lang" not in chunkable + + +# -- ChunkManifest + resume_from -------------------------------------------- + + +def test_chunk_manifest_basic_properties(): + """``total`` is cartesian-product cardinality; ``is_complete`` and + ``remaining`` are derived from ``completed``.""" + plan = _normalize_plan({"a": [["x"], ["y"]], "b": [["1"], ["2"], ["3"]]}) + m = ChunkManifest(plan=plan, completed=0) + assert m.total == 6 + assert m.is_complete is False + assert m.remaining == 6 + + m = ChunkManifest(plan=plan, completed=4) + assert m.remaining == 2 + assert m.is_complete is False + + m = ChunkManifest(plan=plan, completed=6) + assert m.is_complete is True + assert m.remaining == 0 + + +def test_chunk_manifest_is_hashable_and_immutable(): + """``frozen=True`` lets users stash a manifest in a set or use it as + a dict key when bookkeeping partial-resume state across calls.""" + import dataclasses + + plan = _normalize_plan({"a": [["x"], ["y"]]}) + m1 = ChunkManifest(plan=plan, completed=1) + m2 = ChunkManifest(plan=plan, completed=1) + assert m1 == m2 and hash(m1) == hash(m2) + with pytest.raises(dataclasses.FrozenInstanceError): + m1.completed = 99 # type: ignore[misc] + + +def test_normalize_plan_preserves_insertion_order(): + """Cartesian-product iteration order is dict-insertion order; the + normalized plan must keep it stable for the resume validity check.""" + p1 = _normalize_plan({"a": [["1"]], "b": [["x"]]}) + p2 = _normalize_plan({"b": [["x"]], "a": [["1"]]}) # reversed insertion + assert p1 != p2 # order is significant — same chunks, different iteration + + +def test_multi_value_chunked_attaches_manifest_on_success(): + """A successful chunked call attaches the manifest to the aggregated + response so callers can log ``md.chunk_manifest`` and confirm the + request fanned out, even when nothing went wrong.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + _df, resp = fetch({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + manifest = resp.chunk_manifest + assert isinstance(manifest, ChunkManifest) + assert manifest.is_complete is True + assert manifest.completed == manifest.total > 1 + + +def test_multi_value_chunked_no_manifest_when_pass_through(): + """If the request fits in one round-trip the wrapper short-circuits + to ``fetch_once`` and no manifest is attached — caller-side + ``md.chunk_manifest`` (via ``BaseMetadata.__init__``'s ``getattr`` + default) would be ``None``.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(args): + # Use a real ``requests.Response`` so attribute presence is honest: + # ``mock.Mock`` autocreates attributes on access, which would mask + # a missing ``chunk_manifest``. + import requests as _r # local to avoid module-level test dependency + + resp = _r.Response() + resp.elapsed = datetime.timedelta(seconds=0.1) + return pd.DataFrame(), resp + + _df, resp = fetch({"sites": ["A", "B"]}) + assert not hasattr(resp, "chunk_manifest") + + +def test_basemetadata_exposes_chunk_manifest(): + """BaseMetadata is the user's view onto a response — the manifest + must reach the caller through it without any extra wrapping.""" + from dataretrieval.utils import BaseMetadata + + plan = _normalize_plan({"a": [["x"], ["y"]]}) + manifest = ChunkManifest(plan=plan, completed=1) + resp = mock.Mock( + url="u", + elapsed=datetime.timedelta(seconds=0), + headers={}, + chunk_manifest=manifest, + ) + md = BaseMetadata(resp) + assert md.chunk_manifest is manifest + + # A response with no chunk_manifest attribute → md.chunk_manifest is None. + resp_plain = mock.Mock( + url="u", elapsed=datetime.timedelta(seconds=0), headers={}, spec=[] + ) + md_plain = BaseMetadata(resp_plain) + assert md_plain.chunk_manifest is None + + +def test_multi_value_chunked_wraps_walk_pages_failure_in_partial_result(): + """When a sub-request raises (e.g. PR #279's ``RuntimeError`` for + mid-pagination failure), the chunker re-raises as ``PartialResult`` + with the underlying exception preserved via ``__cause__`` and a + manifest pinned at the failing sub-request's index — so the caller + can resume from that exact offset.""" + call_count = {"n": 0} + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + call_count["n"] += 1 + if call_count["n"] == 3: + raise RuntimeError("simulated mid-pagination 5xx after 2 pages") + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + with pytest.raises(PartialResult) as excinfo: + fetch({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + assert err.manifest.completed == 2 # two succeeded before the 3rd failed + assert err.manifest.total == 4 + assert isinstance(err.__cause__, RuntimeError) + assert "mid-pagination" in str(err.__cause__) + # Partial frame combines the two successful chunks. + assert len(err.partial_frame) == 2 + # The synthesized response carries the manifest for BaseMetadata. + assert err.partial_response.chunk_manifest is err.manifest + + +def test_partial_result_first_chunk_failure_returns_empty_frame(): + """Edge case: the very first sub-request fails. ``partial_frame`` is + empty, ``manifest.completed`` is 0, but the response still has the + canonical URL and manifest so a resume attempt can validate the plan.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + raise RuntimeError("transient transport failure") + + with pytest.raises(PartialResult) as excinfo: + fetch({"sites": ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10]}) + + err = excinfo.value + assert err.manifest.completed == 0 + assert err.manifest.total == 4 + assert err.partial_frame.empty + assert err.partial_response.url # canonical URL set even with no responses + + +def test_partial_result_partial_metadata_wraps_response(): + """``PartialResult.partial_metadata`` lazy-wraps the response in + ``BaseMetadata`` so callers can grab the manifest without importing + ``BaseMetadata`` themselves at the catch site.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + raise RuntimeError("boom") + + with pytest.raises(PartialResult) as excinfo: + fetch({"sites": ["S1" * 10, "S2" * 10]}) + + md = excinfo.value.partial_metadata + assert md.chunk_manifest is excinfo.value.manifest + + +def test_resume_from_skips_completed_chunks(): + """``resume_from`` makes the wrapper skip the first ``manifest.completed`` + cartesian-product combinations and fetch only the remainder. The + returned frame contains only the *new* chunks' data — the caller is + responsible for concatenating their saved partial frame.""" + seen: list[tuple[str, ...]] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + seen.append(tuple(args["sites"])) + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + sites = ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10] + # First, run normally to get the plan that would have been used. + plan = _plan_chunks({"sites": sites}, _fake_build, url_limit=240) + plan_norm = _normalize_plan(plan) + + # Pretend a prior call completed 2 of 4 sub-requests. + prior_manifest = ChunkManifest(plan=plan_norm, completed=2) + md = mock.Mock(chunk_manifest=prior_manifest) + + df, resp = fetch({"sites": sites, "resume_from": md}) + # Only the last 2 cartesian-product combos should have run. + assert len(seen) == 2 + assert len(df) == sum(len(s) for s in seen) + # The returned response's manifest shows the full plan completed. + assert resp.chunk_manifest.is_complete + + +def test_resume_from_rejects_mismatched_plan(): + """If the caller changed their kwargs between the original failure + and the resume call, the fresh plan won't match the manifest's plan. + Silently re-fetching with a different plan would interleave data + from two incompatible queries — raise instead.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + pytest.fail("should not reach fetch_once on a mismatched-plan resume") + + plan = _plan_chunks( + {"sites": ["S" * 10 + str(i) for i in range(4)]}, + _fake_build, + url_limit=240, + ) + manifest = ChunkManifest(plan=_normalize_plan(plan), completed=1) + md = mock.Mock(chunk_manifest=manifest) + + with pytest.raises(ValueError, match="manifest does not match"): + fetch( + { + "sites": ["T" * 10 + str(i) for i in range(4)], # different values + "resume_from": md, + } + ) + + +def test_resume_from_rejects_no_manifest(): + """A metadata that wasn't returned by a chunked call has no + ``chunk_manifest``. Treating that as "fresh query" silently would + confuse users who think they're resuming — raise.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + pytest.fail("should not reach fetch_once when resume metadata is invalid") + + md = mock.Mock(chunk_manifest=None) + with pytest.raises(ValueError, match="no chunk_manifest"): + fetch({"sites": ["S" * 10 + str(i) for i in range(4)], "resume_from": md}) + + +def test_resume_from_rejects_complete_manifest(): + """A complete manifest has nothing to resume; the caller probably + has stale state. Raise so they notice rather than re-issuing the + whole plan (which we'd happily skip-to-end).""" + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(_args): + pytest.fail("should not reach fetch_once on an already-complete manifest") + + plan = _plan_chunks( + {"sites": ["S" * 10 + str(i) for i in range(4)]}, + _fake_build, + url_limit=240, + ) + plan_norm = _normalize_plan(plan) + total = math.prod(len(c) for _, c in plan_norm) + md = mock.Mock(chunk_manifest=ChunkManifest(plan=plan_norm, completed=total)) + + with pytest.raises(ValueError, match="already complete"): + fetch({"sites": ["S" * 10 + str(i) for i in range(4)], "resume_from": md}) + + +def test_resume_from_rejects_when_fresh_args_dont_chunk(): + """If the current args fit in one round-trip the wrapper would + short-circuit to ``fetch_once`` — there'd be no plan to skip + against. That state is almost certainly a caller bug (they probably + pruned their site list between calls), so raise.""" + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + def fetch(_args): + pytest.fail("should not reach fetch_once when resume can't be validated") + + plan = _plan_chunks( + {"sites": ["S" * 10 + str(i) for i in range(4)]}, + _fake_build, + url_limit=240, + ) + manifest = ChunkManifest(plan=_normalize_plan(plan), completed=1) + md = mock.Mock(chunk_manifest=manifest) + + with pytest.raises(ValueError, match="do not produce a chunk plan"): + fetch({"sites": ["A", "B"], "resume_from": md}) + + +def test_resume_from_not_forwarded_to_underlying_fetch(): + """``resume_from`` is a chunker-only control kwarg; it must never + reach ``fetch_once`` (which would forward it to the URL builder as a + bogus query parameter).""" + forwarded: list[dict] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=240) + def fetch(args): + forwarded.append(args) + return ( + pd.DataFrame({"site": list(args["sites"])}), + _quota_response(remaining=500), + ) + + sites = ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10] + plan = _plan_chunks({"sites": sites}, _fake_build, url_limit=240) + md = mock.Mock( + chunk_manifest=ChunkManifest(plan=_normalize_plan(plan), completed=2) + ) + + fetch({"sites": sites, "resume_from": md}) + assert forwarded, "fetch_once should have been called for the remaining chunks" + for sub_args in forwarded: + assert "resume_from" not in sub_args + + +def test_resume_e2e_quota_exhaust_then_resume_completes_query(): + """End-to-end: a first call exhausts quota partway through; the + caller catches ``QuotaExhausted``, saves ``(partial_df, + partial_metadata)``, then re-runs with ``resume_from=`` to fetch + the rest. Concatenating the two frames reproduces what a single + successful call would have returned.""" + # First call: 4 chunks, quota header decrements 500, 200, 100, then + # 40 (< floor=50). Quota check runs *after* each chunk but only + # gates the *next* one, so 3 chunks land then the 4th is skipped. + remaining_seq_1 = iter([500, 200, 40]) # only need first 3 — abort before 4th + pages_1 = iter(range(3)) + + def fetch_first(args): + return ( + pd.DataFrame( + { + "site": list(args["sites"]), + "page": [next(pages_1)] * len(args["sites"]), + } + ), + _quota_response(next(remaining_seq_1)), + ) + + decorated_first = multi_value_chunked( + build_request=_fake_build, url_limit=240, quota_safety_floor=50 + )(fetch_first) + + sites = ["S1" * 10, "S2" * 10, "S3" * 10, "S4" * 10] + with pytest.raises(QuotaExhausted) as excinfo: + decorated_first({"sites": sites}) + partial_df = excinfo.value.partial_frame + partial_md = excinfo.value.partial_metadata + assert excinfo.value.manifest.completed == 3 + assert excinfo.value.manifest.total == 4 + + # Second call (after quota window resets): same args + resume_from. + # Only the final cartesian combo should run. + def fetch_second(args): + return ( + pd.DataFrame( + {"site": list(args["sites"]), "page": [99] * len(args["sites"])} + ), + _quota_response(remaining=800), + ) + + decorated_second = multi_value_chunked( + build_request=_fake_build, url_limit=240, quota_safety_floor=50 + )(fetch_second) + + rest_df, rest_md = decorated_second({"sites": sites, "resume_from": partial_md}) + assert rest_md.chunk_manifest.is_complete + assert list(rest_df["page"]) == [99] # exactly one chunk fetched on resume + + final = pd.concat([partial_df, rest_df], ignore_index=True) + assert set(final["page"]) == {0, 1, 2, 99} + assert len(final) == 4 # 4 singleton chunks total + + def test_samples_results(): """Test results call for proper columns""" df, _ = get_samples(