diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b7d919b0..80858f4c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,10 +24,10 @@ jobs: python-version: [ "3.11", "3.12", "3.13" ] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: astral-sh/setup-uv@v6 + - uses: actions/checkout@v5 + - uses: astral-sh/setup-uv@v7 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - name: Install dependencies @@ -44,10 +44,10 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - uses: astral-sh/setup-uv@v6 + - uses: actions/checkout@v5 + - uses: astral-sh/setup-uv@v7 - name: Set up Python 3.13 - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: "3.13" - name: Install dependencies @@ -68,10 +68,10 @@ jobs: python-version: [ "3.11", "3.12", "3.13" ] runs-on: opensource-linux-8core steps: - - uses: actions/checkout@v4 - - uses: astral-sh/setup-uv@v6 + - uses: actions/checkout@v5 + - uses: astral-sh/setup-uv@v7 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - name: Install dependencies @@ -86,6 +86,27 @@ jobs: run: | make test-integration-docker + test_platform_integration: + runs-on: opensource-linux-8core + steps: + - uses: actions/checkout@v5 + - uses: astral-sh/setup-uv@v7 + - name: Set up Python 3.13 + uses: actions/setup-python@v6 + with: + python-version: "3.13" + - name: Install dependencies + env: + UV_LOCKED: "1" + UV_PYTHON: "3.13" + run: make install + - name: Run platform integration tests + env: + UV_PYTHON: "3.13" + UNSTRUCTURED_API_KEY: ${{ secrets.UNSTRUCTURED_API_KEY }} + run: | + make test-integration-platform + test_contract: strategy: fail-fast: false @@ -93,10 +114,10 @@ jobs: python-version: [ "3.11", "3.12", "3.13" ] runs-on: opensource-linux-8core steps: - - uses: actions/checkout@v4 - - uses: astral-sh/setup-uv@v6 + - uses: actions/checkout@v5 + - uses: astral-sh/setup-uv@v7 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/CHANGELOG.md b/CHANGELOG.md index faac2163..5b9e40dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +## 0.43.1 + +### Enhancements +* Add split-PDF observability with operation-aware batch planning, timeout, cancellation, and completion logs. +* Make long-running integration tests stream live progress, timings, and backend failure context for split and single partition phases. + +### Features + +### Fixes +* Preserve chunk-local transport retries for split-PDF execution even when SDK-level retries disable connection-error retries for top-level requests. +* Harden split-PDF timeout and cleanup paths against closed event loops and cancelled chunk tasks. +* Stabilize `hi_res` split integration coverage by using a smaller derived multi-page fixture instead of the flaky full `layout-parser-paper.pdf` path for equivalence and caching checks. + ## 0.42.12 ### Enhancements diff --git a/Makefile b/Makefile index a248b81f..295ff0fc 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,9 @@ PACKAGE_NAME := unstructured-python-client CURRENT_DIR := $(shell pwd) ARCH := $(shell uname -m) DOCKER_IMAGE ?= downloads.unstructured.io/unstructured-io/unstructured-api:latest +INTEGRATION_IGNORE_ARGS := --ignore=_test_unstructured_client/integration/test_platform_workflow_lifecycle.py +INTEGRATION_PYTEST_ARGS := _test_unstructured_client -vv -k integration $(INTEGRATION_IGNORE_ARGS) -o log_cli=true -o log_cli_level=INFO -o log_cli_format="%(asctime)s %(levelname)s %(message)s" --capture=tee-sys --durations=20 --tb=long +PLATFORM_INTEGRATION_PYTEST_ARGS := _test_unstructured_client/integration/test_platform_workflow_lifecycle.py -v -o log_cli=true -o log_cli_level=INFO --durations=20 --tb=long ########### # Install # @@ -36,16 +39,34 @@ test-contract: # Assumes you have unstructured-api running on localhost:8000 .PHONY: test-integration test-integration: - PYTHONPATH=. uv run pytest -n auto _test_unstructured_client -v -k "integration" + PYTHONPATH=. uv run pytest $(INTEGRATION_PYTEST_ARGS) # Runs the unstructured-api in docker for tests .PHONY: test-integration-docker test-integration-docker: - -docker stop unstructured-api && docker kill unstructured-api - docker run --name unstructured-api -p 8000:8000 -d --rm ${DOCKER_IMAGE} --host 0.0.0.0 && \ - curl -s -o /dev/null --retry 10 --retry-delay 5 --retry-all-errors http://localhost:8000/general/docs && \ - PYTHONPATH=. uv run pytest -n auto _test_unstructured_client -v -k "integration" && \ - docker kill unstructured-api + @bash -lc 'set -euo pipefail; \ + container_name=unstructured-api; \ + image="${DOCKER_IMAGE}"; \ + cleanup() { \ + status=$$?; \ + if [ $$status -ne 0 ]; then \ + echo "integration diagnostics image=$$image container=$$container_name"; \ + docker logs "$$container_name" --tail 200 || true; \ + fi; \ + docker kill "$$container_name" >/dev/null 2>&1 || true; \ + exit $$status; \ + }; \ + trap cleanup EXIT; \ + docker stop "$$container_name" >/dev/null 2>&1 || true; \ + docker kill "$$container_name" >/dev/null 2>&1 || true; \ + echo "starting integration api image=$$image"; \ + docker run --name "$$container_name" -p 8000:8000 -d --rm "$$image" --host 0.0.0.0; \ + curl -s -o /dev/null --retry 10 --retry-delay 5 --retry-all-errors http://localhost:8000/general/docs; \ + PYTHONPATH=. uv run pytest $(INTEGRATION_PYTEST_ARGS)' + +.PHONY: test-integration-platform +test-integration-platform: + PYTHONPATH=. uv run pytest $(PLATFORM_INTEGRATION_PYTEST_ARGS) .PHONY: lint lint: diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index 282a8502..1fd69bfb 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -1,7 +1,9 @@ from __future__ import annotations from collections import Counter, defaultdict +import logging import math +import time import tempfile from pathlib import Path from typing import Literal @@ -12,12 +14,13 @@ import requests from deepdiff import DeepDiff from httpx import Response +from pypdf import PdfReader, PdfWriter from requests_toolbelt.multipart.decoder import MultipartDecoder # type: ignore from unstructured_client import UnstructuredClient from unstructured_client.models import shared, operations -from unstructured_client.models.errors import HTTPValidationError +from unstructured_client.models.errors import HTTPValidationError, SDKError, ServerError from unstructured_client.models.shared.partition_parameters import Strategy from unstructured_client.utils.retries import BackoffStrategy, RetryConfig from unstructured_client._hooks.custom import form_utils @@ -25,8 +28,112 @@ FAKE_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" TEST_TIMEOUT_MS = 300_000 +LOCAL_API_DOCS_URL = "http://localhost:8000/general/docs" _HI_RES_STRATEGIES = ("hi_res", Strategy.HI_RES) +logger = logging.getLogger("integration.split_pdf") + + +def _log_integration_progress(event: str, **fields) -> None: + rendered_fields = " ".join(f"{key}={value}" for key, value in fields.items()) + print(f"integration event={event} {rendered_fields}", flush=True) + logger.info("integration event=%s %s", event, rendered_fields) + + +def _assert_local_api_is_running() -> None: + started_at = time.perf_counter() + try: + response = requests.get(LOCAL_API_DOCS_URL) + assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" + except requests.exceptions.ConnectionError: + assert False, "The unstructured-api is not running on localhost:8000" + elapsed_ms = round((time.perf_counter() - started_at) * 1000) + _log_integration_progress( + "api_healthcheck", + url=LOCAL_API_DOCS_URL, + status_code=response.status_code, + elapsed_ms=elapsed_ms, + ) + + +@pytest.fixture(scope="module") +def hi_res_stable_fixture_path(tmp_path_factory) -> str: + """Create a smaller multi-page PDF subset for stable hi_res integration coverage. + + The full 16-page `layout-parser-paper.pdf` is intermittently unstable in the + backend's unsplit hi_res path under long integration runs. We still want a + real multi-page document that exercises split behavior, but with less backend + stress and better determinism. + """ + source_path = Path("_sample_docs/layout-parser-paper.pdf") + output_dir = tmp_path_factory.mktemp("hi_res_fixture") + output_path = output_dir / "layout-parser-paper-hi_res-subset.pdf" + + reader = PdfReader(str(source_path)) + writer = PdfWriter() + for page in reader.pages[:4]: + writer.add_page(page) + + with output_path.open("wb") as output_file: + writer.write(output_file) + + return str(output_path) + + +def _resolve_test_filename( + filename: str, + strategy, + hi_res_stable_fixture_path: str, +) -> str: + if strategy in _HI_RES_STRATEGIES and Path(filename).name == "layout-parser-paper.pdf": + return hi_res_stable_fixture_path + return filename + + +def _describe_partition_exception(exc: Exception) -> str: + if isinstance(exc, (HTTPValidationError, SDKError, ServerError)): + status_code = getattr(exc, "status_code", "unknown") + body = getattr(exc, "body", "") + headers = getattr(exc, "headers", {}) + return ( + f"type={type(exc).__name__} status_code={status_code} " + f"split_operation_id={headers.get('X-Unstructured-Split-Operation-Id', 'missing')} " + f"split_chunk_index={headers.get('X-Unstructured-Split-Chunk-Index', 'missing')} " + f"body={body}" + ) + return f"type={type(exc).__name__} error={exc}" + + +def _run_partition_with_progress( + client: UnstructuredClient, + *, + request: operations.PartitionRequest, + server_url: str, + case_context: str, + phase: str, +): + _log_integration_progress("partition_start", case_context=case_context, phase=phase) + started_at = time.perf_counter() + try: + response = client.general.partition(server_url=server_url, request=request) + except Exception as exc: + _log_integration_progress( + "partition_error", + case_context=case_context, + phase=phase, + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + details=_describe_partition_exception(exc), + ) + raise + _log_integration_progress( + "partition_complete", + case_context=case_context, + phase=phase, + status_code=response.status_code, + element_count=len(response.elements) if response.elements is not None else 0, + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) + return response def _allowed_delta(expected: int, *, absolute: int, ratio: float) -> int: @@ -87,7 +194,14 @@ def _assert_hi_res_output_is_similar(resp_split, resp_single): ) -def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_exclude_paths=None): +def _assert_split_unsplit_equivalent( + resp_split, + resp_single, + strategy, + *, + case_context: str = "", + extra_exclude_paths=None, +): """Compare split-PDF and single-request responses. For hi_res (OCR-based), splitting changes per-page context so text and @@ -95,13 +209,21 @@ def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_ex and text volume so split requests cannot silently drift too far. For deterministic strategies (fast, etc.) we keep strict DeepDiff equality. """ - assert resp_split.status_code == resp_single.status_code - assert resp_split.content_type == resp_single.content_type + context_prefix = f"{case_context}: " if case_context else "" + + assert resp_split.status_code == resp_single.status_code, ( + f"{context_prefix}status mismatch split={resp_split.status_code} single={resp_single.status_code}" + ) + assert resp_split.content_type == resp_single.content_type, ( + f"{context_prefix}content_type mismatch split={resp_split.content_type} single={resp_single.content_type}" + ) if strategy in _HI_RES_STRATEGIES: _assert_hi_res_output_is_similar(resp_split, resp_single) else: - assert len(resp_split.elements) == len(resp_single.elements) + assert len(resp_split.elements) == len(resp_single.elements), ( + f"{context_prefix}element_count mismatch split={len(resp_split.elements)} single={len(resp_single.elements)}" + ) excludes = [r"root\[\d+\]\['metadata'\]\['parent_id'\]"] if extra_exclude_paths: @@ -112,7 +234,7 @@ def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_ex t2=resp_single.elements, exclude_regex_paths=excludes, ) - assert len(diff) == 0 + assert len(diff) == 0, f"{context_prefix}DeepDiff mismatch: {diff}" @pytest.mark.parametrize("concurrency_level", [1, 2, 5]) @@ -130,7 +252,11 @@ def _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, extra_ex ], ) def test_integration_split_pdf_has_same_output_as_non_split( - concurrency_level: int, filename: str, expected_ok: bool, strategy: str + concurrency_level: int, + filename: str, + expected_ok: bool, + strategy: str, + hi_res_stable_fixture_path: str, ): """ Tests that output that we get from the split-by-page pdf is the same as from non-split. @@ -138,18 +264,19 @@ def test_integration_split_pdf_has_same_output_as_non_split( Requires unstructured-api running in bg. See Makefile for how to run it. Doesn't check for raw_response as there's no clear patter for how it changes with the number of pages / concurrency_level. """ - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() + resolved_filename = _resolve_test_filename(filename, strategy, hi_res_stable_fixture_path) client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) + case_context = ( + f"test=split_equivalence file={Path(resolved_filename).name} strategy={strategy} " + f"concurrency={concurrency_level} expected_ok={expected_ok}" + ) - with open(filename, "rb") as f: + with open(resolved_filename, "rb") as f: files = shared.Files( content=f.read(), - file_name=filename, + file_name=Path(resolved_filename).name, ) if not expected_ok: @@ -169,16 +296,26 @@ def test_integration_split_pdf_has_same_output_as_non_split( ) try: - resp_split = client.general.partition( + resp_split = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="split", ) - except (HTTPValidationError, AttributeError) as exc: + except Exception as exc: if not expected_ok: assert "File does not appear to be a valid PDF" in str(exc) + _log_integration_progress( + "partition_expected_failure", + case_context=case_context, + phase="split", + error_type=type(exc).__name__, + ) return - else: - assert exc is None + raise AssertionError( + f"{case_context}: unexpected split failure {_describe_partition_exception(exc)}" + ) from exc parameters.split_pdf_page = False @@ -186,12 +323,15 @@ def test_integration_split_pdf_has_same_output_as_non_split( partition_parameters=parameters ) - resp_single = client.general.partition( - server_url="http://localhost:8000", + resp_single = _run_partition_with_progress( + client, request=req, + server_url="http://localhost:8000", + case_context=case_context, + phase="single", ) - _assert_split_unsplit_equivalent(resp_split, resp_single, strategy) + _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, case_context=case_context) @pytest.mark.parametrize(("filename", "expected_ok", "strategy"), [ @@ -209,19 +349,21 @@ def test_integration_split_pdf_with_caching( strategy: Literal[Strategy.HI_RES], use_caching: bool, cache_dir: Path | None, + hi_res_stable_fixture_path: str, ): - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() + resolved_filename = _resolve_test_filename(filename, strategy, hi_res_stable_fixture_path) client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) + case_context = ( + f"test=split_caching file={Path(resolved_filename).name} strategy={strategy} " + f"use_caching={use_caching} cache_dir={cache_dir}" + ) - with open(filename, "rb") as f: + with open(resolved_filename, "rb") as f: files = shared.Files( content=f.read(), - file_name=filename, + file_name=Path(resolved_filename).name, ) if not expected_ok: @@ -241,16 +383,26 @@ def test_integration_split_pdf_with_caching( ) try: - resp_split = client.general.partition( + resp_split = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="split", ) - except (HTTPValidationError, AttributeError) as exc: + except Exception as exc: if not expected_ok: assert "File does not appear to be a valid PDF" in str(exc) + _log_integration_progress( + "partition_expected_failure", + case_context=case_context, + phase="split", + error_type=type(exc).__name__, + ) return - else: - assert exc is None + raise AssertionError( + f"{case_context}: unexpected split failure {_describe_partition_exception(exc)}" + ) from exc parameters.split_pdf_page = False @@ -258,12 +410,15 @@ def test_integration_split_pdf_with_caching( partition_parameters=parameters ) - resp_single = client.general.partition( + resp_single = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="single", ) - _assert_split_unsplit_equivalent(resp_split, resp_single, strategy) + _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, case_context=case_context) # make sure the cache dir was cleaned if passed explicitly if cache_dir: @@ -272,6 +427,12 @@ def test_integration_split_pdf_with_caching( @pytest.mark.parametrize("filename", ["_sample_docs/super_long_pages.pdf"]) def test_long_pages_hi_res(filename): + _log_integration_progress( + "long_hi_res_start", + file=Path(filename).name, + strategy=shared.Strategy.HI_RES, + concurrency=15, + ) req = operations.PartitionRequest(partition_parameters=shared.PartitionParameters( files=shared.Files(content=open(filename, "rb"), file_name=filename, ), strategy=shared.Strategy.HI_RES, @@ -286,6 +447,12 @@ def test_long_pages_hi_res(filename): request=req, server_url="http://localhost:8000", ) + _log_integration_progress( + "long_hi_res_complete", + file=Path(filename).name, + status_code=response.status_code, + element_count=len(response.elements), + ) assert response.status_code == 200 assert len(response.elements) @@ -293,11 +460,7 @@ def test_integration_split_pdf_for_file_with_no_name(): """ Tests that the client raises an error when the file_name is empty. """ - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) @@ -349,11 +512,7 @@ def test_integration_split_pdf_with_page_range( Requires unstructured-api running in bg. See Makefile for how to run it. """ - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) @@ -375,9 +534,15 @@ def test_integration_split_pdf_with_page_range( ) try: - resp = client.general.partition( + resp = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=( + f"test=page_range file={Path(filename).name} page_range={page_range} " + f"starting_page_number={starting_page_number}" + ), + phase="split", ) except ValueError as exc: assert not expected_ok @@ -410,21 +575,23 @@ def test_integration_split_pdf_strict_mode( filename: str, expected_ok: bool, strategy: shared.Strategy, - caplog + caplog, + hi_res_stable_fixture_path: str, ): """Test strict mode (allow failed = False) for split_pdf.""" - try: - response = requests.get("http://localhost:8000/general/docs") - assert response.status_code == 200, "The unstructured-api is not running on localhost:8000" - except requests.exceptions.ConnectionError: - assert False, "The unstructured-api is not running on localhost:8000" + _assert_local_api_is_running() + resolved_filename = _resolve_test_filename(filename, strategy, hi_res_stable_fixture_path) client = UnstructuredClient(api_key_auth=FAKE_KEY, timeout_ms=TEST_TIMEOUT_MS) + case_context = ( + f"test=strict_mode file={Path(resolved_filename).name} strategy={strategy} " + f"concurrency={concurrency_level} allow_failed={allow_failed}" + ) - with open(filename, "rb") as f: + with open(resolved_filename, "rb") as f: files = shared.Files( content=f.read(), - file_name=filename, + file_name=Path(resolved_filename).name, ) if not expected_ok: @@ -445,17 +612,27 @@ def test_integration_split_pdf_strict_mode( ) try: - resp_split = client.general.partition( + resp_split = _run_partition_with_progress( + client, + request=req, server_url="http://localhost:8000", - request=req + case_context=case_context, + phase="split", ) - except (HTTPValidationError, AttributeError) as exc: + except Exception as exc: if not expected_ok: assert "The file does not appear to be a valid PDF." in caplog.text assert "File does not appear to be a valid PDF" in str(exc) + _log_integration_progress( + "partition_expected_failure", + case_context=case_context, + phase="split", + error_type=type(exc).__name__, + ) return - else: - assert exc is None + raise AssertionError( + f"{case_context}: unexpected split failure {_describe_partition_exception(exc)}" + ) from exc parameters.split_pdf_page = False @@ -463,12 +640,15 @@ def test_integration_split_pdf_strict_mode( partition_parameters=parameters ) - resp_single = client.general.partition( + resp_single = _run_partition_with_progress( + client, request=req, server_url="http://localhost:8000", + case_context=case_context, + phase="single", ) - _assert_split_unsplit_equivalent(resp_split, resp_single, strategy) + _assert_split_unsplit_equivalent(resp_split, resp_single, strategy, case_context=case_context) @pytest.mark.asyncio @@ -560,3 +740,76 @@ async def mock_send(_, request: httpx.Request, **kwargs): assert mock_endpoint_called assert res.status_code == 200 + + +@pytest.mark.asyncio +async def test_split_pdf_transport_errors_still_retry_when_sdk_disables_connection_retries( + monkeypatch, +): + mock_endpoint_called = False + number_of_transport_failures = 2 + + async def mock_send(_, request: httpx.Request, **kwargs): + nonlocal mock_endpoint_called + if request.url.host == "localhost" and "docs" in request.url.path: + mock_endpoint_called = True + return Response(200, request=request) + elif "docs" in request.url.path: + assert False, "The server URL was not set in the dummy request" + + request_body = request.read() + decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type")) + form_data = form_utils.parse_form_data(decoded_body) + + nonlocal number_of_transport_failures + if ( + number_of_transport_failures > 0 + and "starting_page_number" in form_data + and int(form_data["starting_page_number"]) < 3 + ): + number_of_transport_failures -= 1 + raise httpx.ConnectError("transient connect error", request=request) + + mock_return_data = [{ + "type": "Title", + "text": "Hello", + }] + + return Response( + 200, + request=request, + content=json.dumps(mock_return_data), + headers={"Content-Type": "application/json"}, + ) + + monkeypatch.setattr(split_pdf_hook.httpx.AsyncClient, "send", mock_send) + + sdk = UnstructuredClient( + api_key_auth=FAKE_KEY, + retry_config=RetryConfig("backoff", BackoffStrategy(200, 1000, 1.5, 10000), False), + ) + + filename = "_sample_docs/layout-parser-paper.pdf" + with open(filename, "rb") as f: + files = shared.Files( + content=f.read(), + file_name=filename, + ) + + req = operations.PartitionRequest( + partition_parameters=shared.PartitionParameters( + files=files, + split_pdf_page=True, + split_pdf_allow_failed=False, + strategy="fast", + ) + ) + + res = await sdk.general.partition_async( + server_url="http://localhost:8000", + request=req, + ) + + assert number_of_transport_failures == 0 + assert mock_endpoint_called + assert res.status_code == 200 diff --git a/_test_unstructured_client/integration/test_integration.py b/_test_unstructured_client/integration/test_integration.py index b059bb67..299ee872 100644 --- a/_test_unstructured_client/integration/test_integration.py +++ b/_test_unstructured_client/integration/test_integration.py @@ -2,8 +2,10 @@ import asyncio import json +import logging import os from pathlib import Path +import time from deepdiff import DeepDiff import pytest @@ -15,6 +17,13 @@ FAKE_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" LOCAL_API_URL = "http://localhost:8000" +logger = logging.getLogger("integration.split_pdf") + + +def _log_integration_progress(event: str, **fields) -> None: + rendered_fields = " ".join(f"{key}={value}" for key, value in fields.items()) + print(f"integration event={event} {rendered_fields}", flush=True) + logger.info("integration event=%s %s", event, rendered_fields) @pytest.fixture(scope="function") @@ -51,9 +60,19 @@ def test_partition_strategies(split_pdf, strategy, client, doc_path): ) ) + case_context = f"test=partition_strategies file={filename} strategy={strategy} split_pdf={split_pdf}" + _log_integration_progress("partition_start", case_context=case_context) + started_at = time.perf_counter() response = client.general.partition( request=req ) + _log_integration_progress( + "partition_complete", + case_context=case_context, + status_code=response.status_code, + element_count=len(response.elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) assert response.status_code == 200 assert len(response.elements) @@ -126,7 +145,17 @@ async def test_partition_async_returns_elements(client, doc_path): ) ) + _log_integration_progress("partition_async_start", file=filename, strategy="fast", split_pdf=True) + started_at = time.perf_counter() response = await client.general.partition_async(request=req) + _log_integration_progress( + "partition_async_complete", + file=filename, + strategy="fast", + status_code=response.status_code, + element_count=len(response.elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) assert response.status_code == 200 assert len(response.elements) @@ -168,15 +197,30 @@ async def test_partition_async_processes_concurrent_files(client, doc_path): ] serial_results = [] + _log_integration_progress("partition_async_serial_start", request_count=len(requests), file=filename) for req in requests: + started_at = time.perf_counter() res = await client.general.partition_async(request=req) assert res.status_code == 200 serial_results.append(res.elements) + _log_integration_progress( + "partition_async_serial_complete", + status_code=res.status_code, + element_count=len(res.elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) + _log_integration_progress("partition_async_concurrent_start", request_count=len(requests), file=filename) + started_at = time.perf_counter() results = await asyncio.gather( client.general.partition_async(request=requests[0]), client.general.partition_async(request=requests[1]) ) + _log_integration_progress( + "partition_async_concurrent_complete", + request_count=len(results), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) concurrent_results = [] for res in results: @@ -193,8 +237,9 @@ async def test_partition_async_processes_concurrent_files(client, doc_path): def test_uvloop_partitions_without_errors(client, doc_path): """Test that we can use pdf splitting within another asyncio loop.""" + filename = "layout-parser-paper-fast.pdf" + async def call_api(): - filename = "layout-parser-paper-fast.pdf" with open(doc_path / filename, "rb") as f: files = shared.Files( content=f.read(), @@ -220,8 +265,14 @@ async def call_api(): return [] import uvloop - uvloop.install() - elements = asyncio.run(call_api()) + started_at = time.perf_counter() + elements = uvloop.run(call_api()) + _log_integration_progress( + "uvloop_partition_complete", + file=filename, + element_count=len(elements), + elapsed_ms=round((time.perf_counter() - started_at) * 1000), + ) assert len(elements) > 0 diff --git a/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py b/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py index 6515398e..8e721272 100644 --- a/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py +++ b/_test_unstructured_client/integration/test_platform_workflow_lifecycle.py @@ -17,14 +17,34 @@ import json import os +import time from pathlib import Path -from typing import Optional +from typing import Callable, Optional, TypeVar import pytest from unstructured_client import UnstructuredClient from unstructured_client.models import shared, operations -from unstructured_client.models.errors import SDKError +from unstructured_client.models.errors import SDKError, UnstructuredClientError +T = TypeVar("T") + +PLATFORM_429_MAX_ATTEMPTS = 5 +PLATFORM_429_INITIAL_DELAY_SECONDS = 5.0 +PLATFORM_429_MAX_DELAY_SECONDS = 30.0 + + +def call_with_rate_limit_retry(func: Callable[..., T], *args, **kwargs) -> T: + delay_seconds = PLATFORM_429_INITIAL_DELAY_SECONDS + for attempt in range(1, PLATFORM_429_MAX_ATTEMPTS + 1): + try: + return func(*args, **kwargs) + except UnstructuredClientError as exc: + if exc.status_code != 429 or attempt == PLATFORM_429_MAX_ATTEMPTS: + raise + time.sleep(delay_seconds) + delay_seconds = min(delay_seconds * 2, PLATFORM_429_MAX_DELAY_SECONDS) + + raise RuntimeError("429 retry loop exited unexpectedly") @pytest.fixture(scope="module") def doc_path() -> Path: @@ -54,7 +74,8 @@ def created_workflow_id(platform_client: UnstructuredClient) -> Optional[str]: workflow_id = None try: # Create a workflow for testing - create_response = platform_client.workflows.create_workflow( + create_response = call_with_rate_limit_retry( + platform_client.workflows.create_workflow, request=operations.CreateWorkflowRequest( create_workflow=shared.CreateWorkflow( name="test_integration_workflow", @@ -71,11 +92,13 @@ def created_workflow_id(platform_client: UnstructuredClient) -> Optional[str]: if workflow_id: try: # Try to get the workflow first to see if it still exists - platform_client.workflows.get_workflow( + call_with_rate_limit_retry( + platform_client.workflows.get_workflow, request=operations.GetWorkflowRequest(workflow_id=workflow_id) ) # If we get here, it exists, so delete it - platform_client.workflows.delete_workflow( + call_with_rate_limit_retry( + platform_client.workflows.delete_workflow, request=operations.DeleteWorkflowRequest(workflow_id=workflow_id) ) except SDKError: @@ -94,7 +117,8 @@ def test_workflow_lifecycle( Test the complete workflow lifecycle including workflows, jobs, and templates. """ # 1. List workflows - list_response = platform_client.workflows.list_workflows( + list_response = call_with_rate_limit_retry( + platform_client.workflows.list_workflows, request=operations.ListWorkflowsRequest() ) assert list_response.status_code == 200 @@ -102,7 +126,8 @@ def test_workflow_lifecycle( # 2. Get workflow (using the created workflow) if created_workflow_id: - get_response = platform_client.workflows.get_workflow( + get_response = call_with_rate_limit_retry( + platform_client.workflows.get_workflow, request=operations.GetWorkflowRequest(workflow_id=created_workflow_id) ) assert get_response.status_code == 200 @@ -110,7 +135,8 @@ def test_workflow_lifecycle( assert get_response.workflow_information.name == "test_integration_workflow" # 3. List templates - list_templates_response = platform_client.templates.list_templates( + list_templates_response = call_with_rate_limit_retry( + platform_client.templates.list_templates, request=operations.ListTemplatesRequest() ) assert list_templates_response.status_code == 200 @@ -128,7 +154,8 @@ def test_workflow_lifecycle( if template_id not in template_ids and len(templates) > 0: template_id = templates[0].id - get_template_response = platform_client.templates.get_template( + get_template_response = call_with_rate_limit_retry( + platform_client.templates.get_template, request=operations.GetTemplateRequest(template_id=template_id) ) assert get_template_response.status_code == 200 @@ -152,7 +179,8 @@ def test_workflow_lifecycle( with open(pdf_path, "rb") as f: pdf_content = f.read() - create_job_response = platform_client.jobs.create_job( + create_job_response = call_with_rate_limit_retry( + platform_client.jobs.create_job, request=operations.CreateJobRequest( body_create_job=shared.BodyCreateJob( request_data=request_data, @@ -172,14 +200,16 @@ def test_workflow_lifecycle( assert create_job_response.job_information.status in ["SCHEDULED", "IN_PROGRESS"] # 6. Get job - get_job_response = platform_client.jobs.get_job( + get_job_response = call_with_rate_limit_retry( + platform_client.jobs.get_job, request=operations.GetJobRequest(job_id=job_id) ) assert get_job_response.status_code == 200 assert str(get_job_response.job_information.id) == job_id # 7. List jobs - list_jobs_response = platform_client.jobs.list_jobs( + list_jobs_response = call_with_rate_limit_retry( + platform_client.jobs.list_jobs, request=operations.ListJobsRequest() ) assert list_jobs_response.status_code == 200 @@ -187,7 +217,8 @@ def test_workflow_lifecycle( # 8. Delete workflow (cleanup is handled by fixture, but we can verify it works) if created_workflow_id: - delete_response = platform_client.workflows.delete_workflow( + delete_response = call_with_rate_limit_retry( + platform_client.workflows.delete_workflow, request=operations.DeleteWorkflowRequest(workflow_id=created_workflow_id) ) assert delete_response.status_code in [200, 204] @@ -198,7 +229,8 @@ def test_workflow_lifecycle_with_custom_dag_job(platform_client: UnstructuredCli Test creating a job with a custom DAG (ephemeral job type). """ # 1. List templates to understand the structure - list_templates_response = platform_client.templates.list_templates( + list_templates_response = call_with_rate_limit_retry( + platform_client.templates.list_templates, request=operations.ListTemplatesRequest() ) assert list_templates_response.status_code == 200 @@ -222,7 +254,8 @@ def test_workflow_lifecycle_with_custom_dag_job(platform_client: UnstructuredCli "job_nodes": custom_nodes, }) - create_job_response = platform_client.jobs.create_job( + create_job_response = call_with_rate_limit_retry( + platform_client.jobs.create_job, request=operations.CreateJobRequest( body_create_job=shared.BodyCreateJob( request_data=request_data, @@ -234,7 +267,8 @@ def test_workflow_lifecycle_with_custom_dag_job(platform_client: UnstructuredCli assert job_id is not None # 3. Verify the job can be retrieved - get_job_response = platform_client.jobs.get_job( + get_job_response = call_with_rate_limit_retry( + platform_client.jobs.get_job, request=operations.GetJobRequest(job_id=job_id) ) assert get_job_response.status_code == 200 diff --git a/_test_unstructured_client/unit/test_custom_hooks.py b/_test_unstructured_client/unit/test_custom_hooks.py index 1fd8f805..cc8e980b 100644 --- a/_test_unstructured_client/unit/test_custom_hooks.py +++ b/_test_unstructured_client/unit/test_custom_hooks.py @@ -169,7 +169,7 @@ def mock_post(request): with pytest.raises(Exception): session.general.partition(request=req, retries=retries) - pattern = re.compile("Failed to process a request due to connection error .*? " + pattern = re.compile("Failed to process a request due to transport error .*? " "Attempting retry number 1 after sleep.") assert bool(pattern.search(caplog.text)) diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 0900b709..a05e1a9d 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -2,11 +2,13 @@ import asyncio import io +import logging from asyncio import Task from collections import Counter +from concurrent import futures from functools import partial from pathlib import Path -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest @@ -28,12 +30,16 @@ MAX_CONCURRENCY_LEVEL, MAX_PAGES_PER_SPLIT, MIN_PAGES_PER_SPLIT, + SPLIT_PDF_HEADER_PREFIX, SplitPdfHook, _get_request_timeout_seconds, - get_optimal_split_size, run_tasks, + get_optimal_split_size, + run_tasks, ) -from unstructured_client._hooks.types import BeforeRequestContext +from unstructured_client._hooks.types import AfterErrorContext, AfterSuccessContext, BeforeRequestContext from unstructured_client.models import shared +from unstructured_client.types import UNSET +from unstructured_client.utils import BackoffStrategy, RetryConfig def test_unit_clear_operation(): @@ -41,10 +47,7 @@ def test_unit_clear_operation(): hook = SplitPdfHook() operation_id = "some_id" - async def example(): - pass - - hook.coroutines_to_execute[operation_id] = [example(), example()] + hook.coroutines_to_execute[operation_id] = [MagicMock(), MagicMock()] hook.api_successful_responses[operation_id] = [ requests.Response(), requests.Response(), @@ -87,7 +90,7 @@ def test_unit_prepare_request_headers(): headers = request_utils.prepare_request_headers(test_headers) assert headers != test_headers - assert headers, expected_headers + assert dict(headers) == expected_headers def test_unit_create_response(): @@ -100,9 +103,9 @@ def test_unit_create_response(): response = request_utils.create_response(test_elements) - assert response.status_code, expected_status_code - assert response._content, expected_content - assert response.headers.get("Content-Length"), expected_content_length + assert response.status_code == expected_status_code + assert response._content == expected_content + assert response.headers.get("Content-Length") == expected_content_length def test_unit_decode_content_disposition(): @@ -544,23 +547,95 @@ def test_before_request_raises_pdf_validation_error_when_pdf_check_fails(): mock_check_pdf.assert_called_once_with(mock_pdf_reader) -def _make_hook_with_split_request(): - """Helper: run before_request with mocked PDF parsing so it returns a dummy request.""" - hook = SplitPdfHook() - mock_client = httpx.Client() - hook.sdk_init(base_url="http://localhost:8888", client=mock_client) +_MISSING = object() - mock_hook_ctx = MagicMock() - mock_hook_ctx.operation_id = "partition" - mock_hook_ctx.config.timeout_ms = 12_000 - mock_request = MagicMock(spec=httpx.Request) - mock_request.headers = {"Content-Type": "multipart/form-data"} - mock_request.url = httpx.URL("http://localhost:8888/general/v0/general") - mock_request.extensions = {"timeout": {"connect": 12.0, "read": 12.0, "write": 12.0, "pool": 12.0}} +def _httpx_response(content: str, status_code: int = 200) -> httpx.Response: + return httpx.Response( + status_code=status_code, + content=content.encode(), + request=httpx.Request("POST", "http://localhost:8888/general/v0/general"), + ) + + +def _httpx_json_response(payload: list[dict], status_code: int = 200) -> httpx.Response: + return httpx.Response( + status_code=status_code, + json=payload, + request=httpx.Request("POST", "http://localhost:8888/general/v0/general"), + ) + + +async def _transport_error_request( + async_client: httpx.AsyncClient, # pragma: no cover - signature compatibility + limiter: asyncio.Semaphore, # pragma: no cover - signature compatibility + error_cls: type[httpx.TransportError], + request_id: str, +): + raise error_cls( + f"transport failure for {request_id}", + request=httpx.Request("POST", f"http://localhost:8888/chunk/{request_id}"), + ) + + +async def _slow_success_request( + async_client: httpx.AsyncClient, # pragma: no cover - signature compatibility + limiter: asyncio.Semaphore, # pragma: no cover - signature compatibility + content: str, +) -> httpx.Response: + await asyncio.sleep(0.05) + return _httpx_response(content) + + +async def _cancelled_request( + async_client: httpx.AsyncClient, # pragma: no cover - signature compatibility + limiter: asyncio.Semaphore, # pragma: no cover - signature compatibility +) -> httpx.Response: + raise asyncio.CancelledError() + + +def _make_hook_with_split_request( + hook: SplitPdfHook | None = None, + *, + timeout_extension: object = _MISSING, + config_timeout_ms: int | None = 12_000, + retry_config: RetryConfig | object = UNSET, + allow_failed: str | None = None, + cache_tmp_data: str | None = None, + pdf_chunks: list[tuple[io.BytesIO, int]] | None = None, +): + """Helper: run before_request with mocked PDF parsing so it returns a dummy request.""" + hook = hook or SplitPdfHook() + if hook.client is None: + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + + hook_ctx = MagicMock(spec=BeforeRequestContext) + hook_ctx.operation_id = "partition" + hook_ctx.config = MagicMock() + hook_ctx.config.timeout_ms = config_timeout_ms + hook_ctx.config.retry_config = retry_config + + request_extensions: dict[str, object] = {} + if timeout_extension is not _MISSING and timeout_extension is not None: + request_extensions["timeout"] = timeout_extension + elif config_timeout_ms is not None: + timeout_seconds = config_timeout_ms / 1000 + request_extensions["timeout"] = { + "connect": timeout_seconds, + "read": timeout_seconds, + "write": timeout_seconds, + "pool": timeout_seconds, + } + + request = httpx.Request( + "POST", + "http://localhost:8888/general/v0/general", + headers={"Content-Type": "multipart/form-data"}, + extensions=request_extensions, + ) mock_pdf_file = MagicMock() - mock_form_data = { + form_data = { "split_pdf_page": "true", "strategy": "fast", "files": { @@ -569,6 +644,11 @@ def _make_hook_with_split_request(): "file": mock_pdf_file, }, } + if allow_failed is not None: + form_data["split_pdf_allow_failed"] = allow_failed + if cache_tmp_data is not None: + form_data["split_pdf_cache_tmp_data"] = cache_tmp_data + mock_pdf_reader = MagicMock() mock_pdf_reader.get_num_pages.return_value = 100 mock_pdf_reader.pages = [MagicMock()] * 100 @@ -579,25 +659,43 @@ def _make_hook_with_split_request(): patch("unstructured_client._hooks.custom.pdf_utils.check_pdf") as mock_check_pdf, \ patch("unstructured_client._hooks.custom.request_utils.get_base_url") as mock_get_base_url, \ patch.object(hook, "_trim_large_pages", side_effect=lambda pdf, fd: pdf), \ - patch.object(hook, "_get_pdf_chunks_in_memory", return_value=[]): - mock_get_fields.return_value = mock_form_data + patch.object(hook, "_get_pdf_chunk_paths", return_value=[]), \ + patch.object(hook, "_get_pdf_chunks_in_memory", return_value=pdf_chunks or []): + mock_get_fields.return_value = form_data mock_read_pdf.return_value = mock_pdf_reader mock_check_pdf.return_value = mock_pdf_reader mock_get_base_url.return_value = "http://localhost:8888" - result = hook.before_request(mock_hook_ctx, mock_request) + result = hook.before_request(hook_ctx, request) - return hook, mock_hook_ctx, result + return hook, hook_ctx, result def test_before_request_returns_dummy_with_timeout_and_operation_id(): hook, mock_hook_ctx, result = _make_hook_with_split_request() + operation_id = result.headers["operation_id"] assert isinstance(result, httpx.Request) assert str(result.url) == "http://localhost:8888/general/docs" - assert result.headers["operation_id"] + assert operation_id assert result.extensions["timeout"]["read"] == 12.0 - assert mock_hook_ctx.operation_id in hook.pending_operation_ids + assert result.extensions["split_pdf_operation_id"] == operation_id + assert operation_id in hook.pending_operation_ids + + +def test_before_request_logs_split_plan(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger="unstructured-client") + + _, _, result = _make_hook_with_split_request( + allow_failed="true", + pdf_chunks=[(io.BytesIO(b"chunk-1"), 0), (io.BytesIO(b"chunk-2"), 2)], + ) + + operation_id = result.headers["operation_id"] + assert f"event=plan_created operation_id={operation_id}" in caplog.text + assert "chunk_count=2" in caplog.text + assert "allow_failed=True" in caplog.text + assert "cache_mode=disabled" in caplog.text def test_after_error_cleans_up_split_state(): @@ -608,13 +706,515 @@ def test_after_error_cleans_up_split_state(): assert operation_id in hook.executors assert operation_id in hook.coroutines_to_execute - from unstructured_client._hooks.types import AfterErrorContext error_ctx = MagicMock(spec=AfterErrorContext) error_ctx.operation_id = mock_hook_ctx.operation_id - hook.after_error(error_ctx, None, ConnectionError("DNS failed")) + hook.after_error(error_ctx, None, httpx.ConnectError("DNS failed", request=result)) assert operation_id not in hook.executors assert operation_id not in hook.coroutines_to_execute assert operation_id not in hook.operation_timeouts - assert mock_hook_ctx.operation_id not in hook.pending_operation_ids \ No newline at end of file + assert operation_id not in hook.pending_operation_ids + + +@pytest.mark.parametrize( + ("extensions", "expected_timeout"), + [ + ({}, None), + ({"timeout": 42.0}, 42.0), + ], +) +def test_unit_get_request_timeout_seconds_edge_cases(extensions, expected_timeout): + request = httpx.Request("POST", "http://localhost", extensions=extensions) + assert _get_request_timeout_seconds(request) == expected_timeout + + +@pytest.mark.asyncio +async def test_unit_run_tasks_allow_failed_transport_exception(): + tasks = [ + partial(_slow_success_request, content="1"), + partial(_transport_error_request, error_cls=httpx.ReadError, request_id="2"), + partial(_slow_success_request, content="3"), + ] + + responses = await run_tasks(tasks, allow_failed=True) + + assert [response.status_code for _, response in responses] == [200, 500, 200] + assert responses[1][1].extensions["transport_exception"].__class__ is httpx.ReadError + + +@pytest.mark.asyncio +async def test_unit_run_tasks_allow_failed_cancelled_error_treated_as_failure(): + tasks = [ + partial(_slow_success_request, content="1"), + partial(_cancelled_request), + partial(_slow_success_request, content="3"), + ] + + responses = await run_tasks(tasks, allow_failed=True) + + assert [response.status_code for _, response in responses] == [200, 500, 200] + assert isinstance(responses[1][1].extensions["transport_exception"], asyncio.CancelledError) + + +@pytest.mark.asyncio +async def test_unit_run_tasks_disallow_failed_transport_exception_cancels_remaining(): + cancelled_counter = Counter() + + async def _raises_transport_error( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + ) -> httpx.Response: + raise httpx.ConnectError( + "connect failed", + request=httpx.Request("POST", "http://localhost:8888/chunk/failure"), + ) + + async def _cancelled_task( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + content: str, + cancelled_counter: Counter, + ) -> httpx.Response: + try: + await asyncio.sleep(0.5) + return _httpx_response(content) + except asyncio.CancelledError: + cancelled_counter.update(["cancelled"]) + raise + + tasks = [ + partial(_raises_transport_error), + *[ + partial(_cancelled_task, content=f"{index}", cancelled_counter=cancelled_counter) + for index in range(2, 20) + ], + ] + + with pytest.raises(httpx.ConnectError): + await run_tasks(tasks, allow_failed=False) + + await asyncio.sleep(0) + assert cancelled_counter["cancelled"] > 0 + + +def test_unit_concurrent_operations_use_independent_state(): + hook = SplitPdfHook() + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + + hook, _, first_result = _make_hook_with_split_request( + hook=hook, + allow_failed="true", + cache_tmp_data="true", + ) + hook, _, second_result = _make_hook_with_split_request( + hook=hook, + allow_failed="false", + cache_tmp_data="false", + ) + + first_operation_id = first_result.headers["operation_id"] + second_operation_id = second_result.headers["operation_id"] + + assert first_operation_id != second_operation_id + assert hook.allow_failed[first_operation_id] is True + assert hook.allow_failed[second_operation_id] is False + assert hook.cache_tmp_data_feature[first_operation_id] is True + assert hook.cache_tmp_data_feature[second_operation_id] is False + + +def test_unit_after_error_cleans_only_matching_operation_on_transport_failure(): + hook = SplitPdfHook() + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + + hook, _, first_result = _make_hook_with_split_request(hook=hook, allow_failed="true") + hook, _, second_result = _make_hook_with_split_request(hook=hook, allow_failed="false") + + first_operation_id = first_result.headers["operation_id"] + second_operation_id = second_result.headers["operation_id"] + + error_ctx = MagicMock(spec=AfterErrorContext) + error_ctx.operation_id = "partition" + + hook.after_error( + error_ctx, + None, + httpx.ConnectError("DNS failed", request=first_result), + ) + + assert first_operation_id not in hook.executors + assert first_operation_id not in hook.coroutines_to_execute + assert first_operation_id not in hook.pending_operation_ids + assert second_operation_id in hook.executors + assert second_operation_id in hook.coroutines_to_execute + assert second_operation_id in hook.pending_operation_ids + + +def test_unit_before_request_uses_hook_ctx_timeout_when_request_timeout_missing(): + hook, _, result = _make_hook_with_split_request( + timeout_extension=None, + config_timeout_ms=34_000, + ) + operation_id = result.headers["operation_id"] + + assert hook.operation_timeouts[operation_id] == 34.0 + + +@pytest.mark.asyncio +async def test_unit_before_request_threads_client_retry_config_into_chunk_execution(): + retry_config = RetryConfig( + "backoff", + BackoffStrategy(1, 2, 3.0, 4), + retry_connection_errors=False, + ) + hook, _, result = _make_hook_with_split_request( + retry_config=retry_config, + pdf_chunks=[(io.BytesIO(b"chunk"), 0)], + ) + operation_id = result.headers["operation_id"] + coroutine = hook.coroutines_to_execute[operation_id][0] + + with patch( + "unstructured_client._hooks.custom.request_utils.call_api_async", + new=AsyncMock(return_value=_httpx_json_response([])), + ) as mock_call_api_async: + async with httpx.AsyncClient() as client: + await coroutine(async_client=client, limiter=asyncio.Semaphore(1)) + + assert mock_call_api_async.await_args.kwargs["retry_config"] is retry_config + + +def test_unit_after_success_clears_on_await_elements_exception(): + hook, _, result = _make_hook_with_split_request() + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + with patch.object(hook, "_await_elements", side_effect=RuntimeError("boom")): + with pytest.raises(RuntimeError): + hook.after_success(success_ctx, response) + + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + + +def test_unit_future_timeout_triggers_cleanup(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger="unstructured-client") + hook, _, result = _make_hook_with_split_request(pdf_chunks=[(io.BytesIO(b"chunk"), 0)]) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + fake_future: futures.Future[list[tuple[int, httpx.Response]]] = futures.Future() + + def _raise_timeout(timeout=None): + raise futures.TimeoutError() + + fake_future.result = _raise_timeout # type: ignore[method-assign] + fake_executor = MagicMock() + tempdir = MagicMock() + tempdir.name = "/tmp/test-split-timeout" + loop = MagicMock() + + def _submit_side_effect(*args, **kwargs): + args[1].close() + loop_holder = args[2] + loop_holder["loop"] = loop + return fake_future + + fake_executor.submit.side_effect = _submit_side_effect + hook.executors[operation_id] = fake_executor + hook.tempdirs[operation_id] = tempdir + + with pytest.raises(futures.TimeoutError): + hook.after_success(success_ctx, response) + + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + loop.call_soon_threadsafe.assert_called() + tempdir.cleanup.assert_not_called() + fake_executor.shutdown.assert_not_called() + assert f"event=batch_timeout operation_id={operation_id}" in caplog.text + + fake_future.set_exception(futures.CancelledError()) + + tempdir.cleanup.assert_called_once() + fake_executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) + + +def test_unit_future_timeout_preserves_timeout_when_loop_is_closed( + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(logging.INFO, logger="unstructured-client") + hook, _, result = _make_hook_with_split_request(pdf_chunks=[(io.BytesIO(b"chunk"), 0)]) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + fake_future: futures.Future[list[tuple[int, httpx.Response]]] = futures.Future() + + def _raise_timeout(timeout=None): + raise futures.TimeoutError() + + fake_future.result = _raise_timeout # type: ignore[method-assign] + fake_executor = MagicMock() + tempdir = MagicMock() + tempdir.name = "/tmp/test-split-timeout-closed-loop" + loop = MagicMock() + loop.call_soon_threadsafe.side_effect = RuntimeError("Event loop is closed") + + def _submit_side_effect(*args, **kwargs): + loop_holder = args[2] + loop_holder["loop"] = loop + return fake_future + + fake_executor.submit.side_effect = _submit_side_effect + hook.executors[operation_id] = fake_executor + hook.tempdirs[operation_id] = tempdir + + with pytest.raises(futures.TimeoutError): + hook.after_success(success_ctx, response) + + assert "event=loop_closed_during_cancel" in caplog.text + fake_future.set_exception(futures.CancelledError()) + tempdir.cleanup.assert_called_once() + fake_executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) + + +def test_unit_clear_operation_does_not_raise_when_loop_is_closed(): + hook = SplitPdfHook() + operation_id = "loop-closed-clear-operation" + future: futures.Future[list[tuple[int, httpx.Response]]] = futures.Future() + executor = MagicMock() + tempdir = MagicMock() + loop = MagicMock() + loop.call_soon_threadsafe.side_effect = RuntimeError("Event loop is closed") + + hook.coroutines_to_execute[operation_id] = [MagicMock()] + hook.executors[operation_id] = executor + hook.tempdirs[operation_id] = tempdir + hook.operation_futures[operation_id] = future + hook.operation_loops[operation_id] = {"loop": loop} + + hook._clear_operation(operation_id) + + future.set_result([]) + tempdir.cleanup.assert_called_once() + executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) + + +@pytest.mark.asyncio +async def test_unit_call_api_async_closes_file_on_exception(): + pdf_chunk_file = MagicMock(spec=io.BufferedReader) + pdf_chunk_file.closed = False + request = httpx.Request("POST", "http://localhost:8888/general/v0/general") + client = AsyncMock(spec=httpx.AsyncClient) + + with patch( + "unstructured_client._hooks.custom.request_utils.retry_async", + new=AsyncMock(side_effect=httpx.ConnectError("boom", request=request)), + ): + with pytest.raises(httpx.ConnectError): + await request_utils.call_api_async( + client=client, + pdf_chunk_request=request, + pdf_chunk_file=pdf_chunk_file, + limiter=asyncio.Semaphore(1), + ) + + pdf_chunk_file.close.assert_called_once() + + +@pytest.mark.asyncio +async def test_unit_call_api_async_logs_chunk_context(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.DEBUG, logger="unstructured-client") + pdf_chunk_file = io.BytesIO(b"chunk") + request = httpx.Request("POST", "http://localhost:8888/general/v0/general") + client = AsyncMock(spec=httpx.AsyncClient) + + with patch( + "unstructured_client._hooks.custom.request_utils.retry_async", + new=AsyncMock(side_effect=httpx.ConnectError("boom", request=request)), + ): + with pytest.raises(httpx.ConnectError): + await request_utils.call_api_async( + client=client, + pdf_chunk_request=request, + pdf_chunk_file=pdf_chunk_file, + limiter=asyncio.Semaphore(1), + operation_id="op-123", + chunk_index=4, + page_number=17, + ) + + assert "event=chunk_request_error operation_id=op-123 chunk_index=4 page_number=17" in caplog.text + + +def test_unit_allow_failed_partial_results(caplog: pytest.LogCaptureFixture): + caplog.set_level(logging.INFO, logger="unstructured-client") + hook = SplitPdfHook() + operation_id = "allow-failed-partial" + hook.coroutines_to_execute[operation_id] = [partial(_request_mock, fails=False, content="unused")] * 3 + hook.concurrency_level[operation_id] = 3 + hook.allow_failed[operation_id] = True + hook.cache_tmp_data_feature[operation_id] = False + hook.executors[operation_id] = MagicMock() + + fake_future = MagicMock() + fake_future.result.return_value = [ + (1, _httpx_json_response([{"page_number": 1}])), + (2, _httpx_response("boom", status_code=500)), + (3, _httpx_json_response([{"page_number": 3}])), + ] + hook.executors[operation_id].submit.return_value = fake_future + + elements = hook._await_elements(operation_id) + + assert elements == [{"page_number": 1}, {"page_number": 3}] + assert len(hook.api_failed_responses[operation_id]) == 1 + assert f"event=batch_complete operation_id={operation_id}" in caplog.text + assert "success_count=2" in caplog.text + assert "failure_count=1" in caplog.text + + +def test_unit_allow_failed_all_fail_records_failures(): + hook = SplitPdfHook() + operation_id = "allow-failed-all-fail" + hook.coroutines_to_execute[operation_id] = [partial(_request_mock, fails=False, content="unused")] * 2 + hook.concurrency_level[operation_id] = 2 + hook.allow_failed[operation_id] = True + hook.cache_tmp_data_feature[operation_id] = False + hook.executors[operation_id] = MagicMock() + + fake_future = MagicMock() + fake_future.result.return_value = [ + (1, _httpx_response("boom", status_code=500)), + (2, _httpx_response("boom", status_code=500)), + ] + hook.executors[operation_id].submit.return_value = fake_future + + assert hook._await_elements(operation_id) == [] + assert len(hook.api_failed_responses[operation_id]) == 2 + + +def test_unit_allow_failed_after_success_returns_first_failed_response_when_zero_chunks_succeed(): + hook, _, result = _make_hook_with_split_request(allow_failed="true") + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + failed_response = _httpx_response("transport failure", status_code=500) + failed_response.extensions["transport_exception"] = httpx.ConnectError( + "boom", + request=failed_response.request, + ) + hook._annotate_failure_response( + operation_id, + failed_chunk_index=1, + successful_count=0, + failed_count=1, + total_chunks=1, + response=failed_response, + ) + hook.allow_failed[operation_id] = True + hook.api_successful_responses[operation_id] = [] + hook.api_failed_responses[operation_id] = [failed_response] + + with patch.object(hook, "_await_elements", return_value=[]): + returned_response = hook.after_success(success_ctx, response) + + assert returned_response is failed_response + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Operation-Id"] == operation_id + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Chunk-Index"] == "1" + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Success-Count"] == "0" + assert returned_response.headers[f"{SPLIT_PDF_HEADER_PREFIX}Failure-Count"] == "1" + assert returned_response.extensions["split_pdf_failure_metadata"][ + f"{SPLIT_PDF_HEADER_PREFIX}Operation-Id" + ] == operation_id + + +def test_unit_disallow_failed_after_success_returns_first_failed_response(): + hook, _, result = _make_hook_with_split_request() + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + failed_response = _httpx_response("failure", status_code=500) + hook.allow_failed[operation_id] = False + hook.api_failed_responses[operation_id] = [failed_response] + + with patch.object(hook, "_await_elements", return_value=[]): + returned_response = hook.after_success(success_ctx, response) + + assert returned_response is failed_response + + +def test_before_request_failure_after_state_setup_cleans_partial_operation(): + hook = SplitPdfHook() + hook.sdk_init(base_url="http://localhost:8888", client=httpx.Client()) + executor = MagicMock() + tempdir = MagicMock() + tempdir.name = "/tmp/before-request-failure" + hook_ctx = MagicMock(spec=BeforeRequestContext) + hook_ctx.operation_id = "partition" + hook_ctx.config = MagicMock() + hook_ctx.config.timeout_ms = 12_000 + hook_ctx.config.retry_config = UNSET + request = httpx.Request( + "POST", + "http://localhost:8888/general/v0/general", + headers={"Content-Type": "multipart/form-data"}, + extensions={"timeout": {"connect": 12.0, "read": 12.0, "write": 12.0, "pool": 12.0}}, + ) + mock_pdf_file = MagicMock() + mock_form_data = { + "split_pdf_page": "true", + "strategy": "fast", + "split_pdf_cache_tmp_data": "true", + "files": { + "filename": "test.pdf", + "content_type": "application/pdf", + "file": mock_pdf_file, + }, + } + mock_pdf_reader = MagicMock() + mock_pdf_reader.get_num_pages.return_value = 100 + mock_pdf_reader.pages = [MagicMock()] * 100 + mock_pdf_reader.stream = io.BytesIO(b"fake-pdf-bytes") + + def _chunk_paths_side_effect(*args, **kwargs): + hook.tempdirs[kwargs["operation_id"]] = tempdir + return [(Path("/tmp/chunk-1.pdf"), 0)] + + with patch("unstructured_client._hooks.custom.request_utils.get_multipart_stream_fields") as mock_get_fields, \ + patch("unstructured_client._hooks.custom.pdf_utils.read_pdf") as mock_read_pdf, \ + patch("unstructured_client._hooks.custom.pdf_utils.check_pdf") as mock_check_pdf, \ + patch("unstructured_client._hooks.custom.request_utils.get_base_url") as mock_get_base_url, \ + patch("unstructured_client._hooks.custom.split_pdf_hook.futures.ThreadPoolExecutor", return_value=executor), \ + patch("unstructured_client._hooks.custom.request_utils.create_pdf_chunk_request", side_effect=RuntimeError("chunk build failed")), \ + patch.object(hook, "_trim_large_pages", side_effect=lambda pdf, fd: pdf), \ + patch.object(hook, "_get_pdf_chunk_paths", side_effect=_chunk_paths_side_effect), \ + patch.object(hook, "_get_pdf_chunk_files", return_value=[(io.BytesIO(b"chunk"), 0)]): + mock_get_fields.return_value = mock_form_data + mock_read_pdf.return_value = mock_pdf_reader + mock_check_pdf.return_value = mock_pdf_reader + mock_get_base_url.return_value = "http://localhost:8888" + + with pytest.raises(RuntimeError, match="chunk build failed"): + hook.before_request(hook_ctx, request) + + assert hook.coroutines_to_execute == {} + assert hook.executors == {} + assert hook.tempdirs == {} + assert hook.operation_timeouts == {} + assert hook.operation_retry_configs == {} + assert hook.allow_failed == {} + assert hook.cache_tmp_data_feature == {} + assert hook.cache_tmp_data_dir == {} + tempdir.cleanup.assert_called_once() + executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) \ No newline at end of file diff --git a/gen.yaml b/gen.yaml index 30d27dfb..2f03dd05 100644 --- a/gen.yaml +++ b/gen.yaml @@ -27,7 +27,7 @@ generation: generateNewTests: false skipResponseBodyAssertions: false python: - version: 0.43.0 + version: 0.43.1 additionalDependencies: dev: deepdiff: '>=9.0.0' diff --git a/src/unstructured_client/_hooks/custom/logger_hook.py b/src/unstructured_client/_hooks/custom/logger_hook.py index f142ebc3..8f2451a7 100644 --- a/src/unstructured_client/_hooks/custom/logger_hook.py +++ b/src/unstructured_client/_hooks/custom/logger_hook.py @@ -17,6 +17,7 @@ from collections import defaultdict logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) +SPLIT_HEADER_PREFIX = "X-Unstructured-Split-" class LoggerHook(AfterErrorHook, AfterSuccessHook, SDKInitHook): @@ -25,22 +26,41 @@ class LoggerHook(AfterErrorHook, AfterSuccessHook, SDKInitHook): def __init__(self) -> None: self.retries_counter: DefaultDict[str, int] = defaultdict(int) + @staticmethod + def _split_response_context(response: Optional[httpx.Response]) -> str: + if response is None: + return "" + operation_id = response.headers.get(f"{SPLIT_HEADER_PREFIX}Operation-Id") + chunk_index = response.headers.get(f"{SPLIT_HEADER_PREFIX}Chunk-Index") + success_count = response.headers.get(f"{SPLIT_HEADER_PREFIX}Success-Count") + failure_count = response.headers.get(f"{SPLIT_HEADER_PREFIX}Failure-Count") + if not any([operation_id, chunk_index, success_count, failure_count]): + return "" + return ( + f" split_operation_id={operation_id}" + f" split_chunk_index={chunk_index}" + f" split_success_count={success_count}" + f" split_failure_count={failure_count}" + ) + def log_retries(self, response: Optional[httpx.Response], error: Optional[Exception], operation_id: str,): """Log retries to give users visibility into requests.""" + split_context = self._split_response_context(response) if response is not None and response.status_code // 100 == 5: logger.info( "Failed to process a request due to API server error with status code %d. " - "Attempting retry number %d after sleep.", + "Attempting retry number %d after sleep.%s", response.status_code, self.retries_counter[operation_id], + split_context, ) if response.text: logger.info("Server message - %s", response.text) - elif error is not None and isinstance(error, httpx.ConnectError): + elif error is not None and isinstance(error, httpx.TransportError): logger.info( - "Failed to process a request due to connection error - %s. " + "Failed to process a request due to transport error - %s. " "Attempting retry number %d after sleep.", error, self.retries_counter[operation_id], @@ -79,7 +99,12 @@ def after_error( # a success here when one of the split requests was partitioned successfully return response, error if response: - logger.error("Server responded with %d - %s", response.status_code, response.text) + logger.error( + "Server responded with %d - %s%s", + response.status_code, + response.text, + self._split_response_context(response), + ) if error is not None: logger.error("Following error occurred - %s", error, exc_info=error) return response, error diff --git a/src/unstructured_client/_hooks/custom/request_utils.py b/src/unstructured_client/_hooks/custom/request_utils.py index 7eae42cf..bfc9cb0f 100644 --- a/src/unstructured_client/_hooks/custom/request_utils.py +++ b/src/unstructured_client/_hooks/custom/request_utils.py @@ -4,7 +4,7 @@ import io import json import logging -from typing import Tuple, Any, BinaryIO +from typing import Tuple, Any, BinaryIO, Optional from urllib.parse import urlparse import httpx @@ -34,6 +34,34 @@ logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) +def create_default_retry_config() -> RetryConfig: + one_second = 1000 + one_minute = 1000 * 60 + return RetryConfig( + "backoff", + BackoffStrategy( + initial_interval=one_second * 3, + max_interval=one_minute * 12, + max_elapsed_time=one_minute * 30, + exponent=1.88, + ), + retry_connection_errors=True, + ) + + +def create_split_retry_config(retry_config: Optional[RetryConfig]) -> RetryConfig: + if retry_config is None: + return create_default_retry_config() + + # Split chunk requests run after the top-level dummy request has already + # succeeded, so they must preserve their own transport-level retry budget. + return RetryConfig( + retry_config.strategy, + retry_config.backoff, + True, + ) + + def get_multipart_stream_fields(request: httpx.Request) -> dict[str, Any]: """Extracts the multipart fields from the request. @@ -163,34 +191,48 @@ async def call_api_async( pdf_chunk_request: httpx.Request, pdf_chunk_file: BinaryIO, limiter: asyncio.Semaphore, + retry_config: Optional[RetryConfig] = None, + operation_id: Optional[str] = None, + chunk_index: Optional[int] = None, + page_number: Optional[int] = None, ) -> httpx.Response: - one_second = 1000 - one_minute = 1000 * 60 - - retry_config = RetryConfig( - "backoff", - BackoffStrategy( - initial_interval=one_second * 3, - max_interval=one_minute * 12, - max_elapsed_time=one_minute * 30, - exponent=1.88, - ), - retry_connection_errors=True, - ) - retryable_codes = ["5xx"] + effective_retry_config = create_split_retry_config(retry_config) async def do_request(): return await client.send(pdf_chunk_request) async with limiter: try: + logger.debug( + "split_pdf event=chunk_request_send operation_id=%s chunk_index=%s page_number=%s retry_config_mode=%s retry_connection_errors=%s", + operation_id, + chunk_index, + page_number, + "sdk_custom" if retry_config is not None else "sdk_default_or_unset", + effective_retry_config.retry_connection_errors, + ) response = await retry_async( - do_request, Retries(retry_config, retryable_codes) + do_request, Retries(effective_retry_config, retryable_codes) + ) + logger.debug( + "split_pdf event=chunk_request_response operation_id=%s chunk_index=%s page_number=%s status_code=%d", + operation_id, + chunk_index, + page_number, + response.status_code, ) return response except Exception as e: - logger.error("Request failed with error: %s", e, exc_info=e) + logger.error( + "split_pdf event=chunk_request_error operation_id=%s chunk_index=%s page_number=%s error_type=%s error=%s", + operation_id, + chunk_index, + page_number, + type(e).__name__, + e, + exc_info=e, + ) raise e finally: if not isinstance(pdf_chunk_file, io.BytesIO) and not pdf_chunk_file.closed: diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 58a8d168..6e29978a 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -5,6 +5,7 @@ import json import logging import math +import time import os import tempfile import uuid @@ -43,6 +44,7 @@ SDKInitHook, ) from unstructured_client.httpclient import HttpClient, AsyncHttpClient +from unstructured_client.utils import RetryConfig logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) @@ -58,6 +60,15 @@ MAX_PAGE_LENGTH = 4000 TIMEOUT_BUFFER_SECONDS = 5 DEFAULT_FUTURE_TIMEOUT_MINUTES = 60 +OPERATION_ID_EXTENSION_KEY = "split_pdf_operation_id" +SPLIT_PDF_HEADER_PREFIX = "X-Unstructured-Split-" + + +class ChunkExecutionError(Exception): + def __init__(self, index: int, inner: BaseException): + super().__init__(str(inner)) + self.index = index + self.inner = inner def _get_request_timeout_seconds(request: httpx.Request) -> Optional[float]: @@ -81,12 +92,23 @@ def _get_request_timeout_seconds(request: httpx.Request) -> Optional[float]: def _run_coroutines_in_separate_thread( coroutines_task: Coroutine[Any, Any, list[tuple[int, httpx.Response]]], + loop_holder: dict[str, Optional[asyncio.AbstractEventLoop]], ) -> list[tuple[int, httpx.Response]]: - return asyncio.run(coroutines_task) + async def runner() -> list[tuple[int, httpx.Response]]: + loop_holder["loop"] = asyncio.get_running_loop() + try: + return await coroutines_task + finally: + loop_holder["loop"] = None + + return asyncio.run(runner()) async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Response]: - response = await coro + try: + response = await coro + except BaseException as exc: + raise ChunkExecutionError(index, exc) from exc return index, response @@ -95,6 +117,7 @@ async def run_tasks( allow_failed: bool = False, concurrency_level: int = 10, client_timeout: Optional[httpx.Timeout] = None, + operation_id: Optional[str] = None, ) -> list[tuple[int, httpx.Response]]: """Run a list of coroutines in parallel and return the results in order. @@ -117,22 +140,88 @@ async def run_tasks( client_timeout_minutes = int(timeout_var) client_timeout = httpx.Timeout(60 * client_timeout_minutes) + logger.debug( + "split_pdf event=batch_async_start operation_id=%s chunk_count=%d concurrency=%d client_timeout=%s allow_failed=%s", + operation_id, + len(coroutines), + concurrency_level, + client_timeout, + allow_failed, + ) + async with httpx.AsyncClient(timeout=client_timeout) as client: armed_coroutines = [coro(async_client=client, limiter=limiter) for coro in coroutines] # type: ignore if allow_failed: - responses = await asyncio.gather(*armed_coroutines, return_exceptions=False) - return list(enumerate(responses, 1)) + responses = await asyncio.gather(*armed_coroutines, return_exceptions=True) + normalized_responses: list[tuple[int, httpx.Response]] = [] + for index, result in enumerate(responses, 1): + if isinstance(result, ChunkExecutionError): + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + result.index, + type(result.inner).__name__, + result.inner, + exc_info=result.inner, + ) + normalized_responses.append( + ( + result.index, + _create_transport_error_response(result.inner), + ) + ) + elif isinstance(result, BaseException): + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + index, + type(result).__name__, + result, + exc_info=result, + ) + normalized_responses.append((index, _create_transport_error_response(result))) + else: + normalized_responses.append((index, cast(httpx.Response, result))) + return normalized_responses # TODO: replace with asyncio.TaskGroup for python >3.11 # pylint: disable=fixme tasks = [asyncio.create_task(_order_keeper(index, coro)) for index, coro in enumerate(armed_coroutines, 1)] results = [] remaining_tasks = dict(enumerate(tasks, 1)) for future in asyncio.as_completed(tasks): - index, response = await future + try: + index, response = await future + except ChunkExecutionError as exc: + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + exc.index, + type(exc.inner).__name__, + exc.inner, + exc_info=exc.inner, + ) + for remaining_task in remaining_tasks.values(): + remaining_task.cancel() + logger.warning( + "split_pdf event=batch_cancel_remaining operation_id=%s reason=transport_exception failed_chunk_index=%d remaining_tasks=%d", + operation_id, + exc.index, + len(remaining_tasks), + ) + if isinstance(exc.inner, Exception): + raise exc.inner + raise RuntimeError("Split PDF chunk cancelled") from exc.inner if response.status_code != 200: # cancel all remaining tasks for remaining_task in remaining_tasks.values(): remaining_task.cancel() + logger.warning( + "split_pdf event=batch_cancel_remaining operation_id=%s reason=http_error failed_chunk_index=%d status_code=%d remaining_tasks=%d", + operation_id, + index, + response.status_code, + len(remaining_tasks), + ) results.append((index, response)) break results.append((index, response)) @@ -142,6 +231,44 @@ async def run_tasks( return sorted(results, key=lambda x: x[0]) +def _create_transport_error_response(error: BaseException) -> httpx.Response: + request = getattr(error, "request", None) + if not isinstance(request, httpx.Request): + request = httpx.Request("GET", "http://split-pdf.invalid") + return httpx.Response( + status_code=500, + request=request, + content=str(error).encode(), + extensions={"transport_exception": error}, + ) + + +def _cancel_running_tasks() -> None: + for task in asyncio.all_tasks(): + if not task.done(): + task.cancel() + + +def _request_task_cancellation( + loop: Optional[asyncio.AbstractEventLoop], + *, + operation_id: str, +) -> bool: + if loop is None: + return False + try: + loop.call_soon_threadsafe(_cancel_running_tasks) + return True + except RuntimeError as exc: + if "Event loop is closed" in str(exc): + logger.warning( + "split_pdf event=loop_closed_during_cancel operation_id=%s", + operation_id, + ) + return False + raise + + def get_optimal_split_size(num_pages: int, concurrency_level: int) -> int: """Distributes pages to workers evenly based on the number of pages and desired concurrency level.""" if num_pages < MAX_PAGES_PER_SPLIT * concurrency_level: @@ -180,7 +307,6 @@ class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorH def __init__(self) -> None: self.client: Optional[HttpClient] = None self.partition_base_url: Optional[str] = None - self.is_partition_request: bool = False self.async_client: Optional[AsyncHttpClient] = None self.coroutines_to_execute: dict[ str, list[partial[Coroutine[Any, Any, httpx.Response]]] @@ -189,12 +315,106 @@ def __init__(self) -> None: self.api_successful_responses: dict[str, list[httpx.Response]] = {} self.api_failed_responses: dict[str, list[httpx.Response]] = {} self.executors: dict[str, futures.ThreadPoolExecutor] = {} + self.operation_futures: dict[str, futures.Future[list[tuple[int, httpx.Response]]]] = {} self.tempdirs: dict[str, tempfile.TemporaryDirectory] = {} self.operation_timeouts: dict[str, Optional[float]] = {} + self.operation_retry_configs: dict[str, Optional[RetryConfig]] = {} + self.operation_loops: dict[str, dict[str, Optional[asyncio.AbstractEventLoop]]] = {} self.pending_operation_ids: dict[str, str] = {} - self.allow_failed: bool = DEFAULT_ALLOW_FAILED - self.cache_tmp_data_feature: bool = DEFAULT_CACHE_TMP_DATA - self.cache_tmp_data_dir: str = DEFAULT_CACHE_TMP_DATA_DIR + self.allow_failed: dict[str, bool] = {} + self.cache_tmp_data_feature: dict[str, bool] = {} + self.cache_tmp_data_dir: dict[str, str] = {} + + @staticmethod + def _get_operation_id_from_request(request: Optional[httpx.Request]) -> Optional[str]: + if request is None: + return None + extension_operation_id = request.extensions.get(OPERATION_ID_EXTENSION_KEY) + if isinstance(extension_operation_id, str): + return extension_operation_id + header_operation_id = request.headers.get("operation_id") + if header_operation_id: + return header_operation_id + return None + + def _get_operation_id( + self, + response: Optional[httpx.Response] = None, + error: Optional[Exception] = None, + ) -> Optional[str]: + if response is not None: + operation_id = self._get_operation_id_from_request(response.request) + if operation_id is not None: + return operation_id + + error_request = getattr(error, "request", None) + if isinstance(error_request, httpx.Request): + return self._get_operation_id_from_request(error_request) + + return None + + @staticmethod + def _retry_config_observability_mode(retry_config: Optional[RetryConfig]) -> str: + return "sdk_custom" if retry_config is not None else "sdk_default_or_unset" + + @staticmethod + def _cache_mode_observability_value( + cache_enabled: bool, + cache_dir: str, + ) -> str: + if not cache_enabled: + return "disabled" + if Path(cache_dir).resolve() == Path(DEFAULT_CACHE_TMP_DATA_DIR).resolve(): + return "default" + return "custom" + + @staticmethod + def _is_transport_failure_response(response: httpx.Response) -> bool: + return "transport_exception" in response.extensions + + def _build_split_failure_metadata( + self, + operation_id: str, + *, + failed_chunk_index: int, + successful_count: int, + failed_count: int, + total_chunks: int, + response: httpx.Response, + ) -> dict[str, str]: + metadata = { + f"{SPLIT_PDF_HEADER_PREFIX}Operation-Id": operation_id, + f"{SPLIT_PDF_HEADER_PREFIX}Chunk-Index": str(failed_chunk_index), + f"{SPLIT_PDF_HEADER_PREFIX}Chunk-Count": str(total_chunks), + f"{SPLIT_PDF_HEADER_PREFIX}Success-Count": str(successful_count), + f"{SPLIT_PDF_HEADER_PREFIX}Failure-Count": str(failed_count), + f"{SPLIT_PDF_HEADER_PREFIX}Transport-Failure": str( + self._is_transport_failure_response(response) + ).lower(), + } + return metadata + + def _annotate_failure_response( + self, + operation_id: str, + *, + failed_chunk_index: int, + successful_count: int, + failed_count: int, + total_chunks: int, + response: httpx.Response, + ) -> httpx.Response: + metadata = self._build_split_failure_metadata( + operation_id, + failed_chunk_index=failed_chunk_index, + successful_count=successful_count, + failed_count=failed_count, + total_chunks=total_chunks, + response=response, + ) + response.headers.update(metadata) + response.extensions["split_pdf_failure_metadata"] = metadata + return response def sdk_init( self, base_url: str, client: HttpClient @@ -279,12 +499,9 @@ def before_request( # Actually the general.partition operation overwrites the default client's base url (as # the platform operations do). Here we need to get the base url from the request object. - if hook_ctx.operation_id == "partition": - self.partition_base_url = get_base_url(request.url) - self.is_partition_request = True - else: - self.is_partition_request = False + if hook_ctx.operation_id != "partition": return request + self.partition_base_url = get_base_url(request.url) if self.client is None: logger.warning("HTTP client not accessible! Continuing without splitting.") @@ -329,7 +546,7 @@ def before_request( fallback_value=DEFAULT_STARTING_PAGE_NUMBER, ) - self.allow_failed = form_utils.get_split_pdf_allow_failed_param( + allow_failed = form_utils.get_split_pdf_allow_failed_param( form_data, key=PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, fallback_value=DEFAULT_ALLOW_FAILED, @@ -342,13 +559,13 @@ def before_request( max_allowed=MAX_CONCURRENCY_LEVEL, ) - self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data( + cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data( form_data, key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY, fallback_value=DEFAULT_CACHE_TMP_DATA, ) - self.cache_tmp_data_dir = form_utils.get_split_pdf_cache_tmp_data_dir( + cache_tmp_data_dir = form_utils.get_split_pdf_cache_tmp_data_dir( form_data, key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_DIR_KEY, fallback_value=DEFAULT_CACHE_TMP_DATA_DIR, @@ -371,77 +588,141 @@ def before_request( if split_size >= page_count and page_count == len(pdf.pages): return request + self.allow_failed[operation_id] = allow_failed + self.cache_tmp_data_feature[operation_id] = cache_tmp_data_feature + self.cache_tmp_data_dir[operation_id] = cache_tmp_data_dir self.concurrency_level[operation_id] = concurrency_level self.executors[operation_id] = futures.ThreadPoolExecutor(max_workers=1) - pdf = self._trim_large_pages(pdf, form_data) - - pdf.stream.seek(0) - pdf_bytes = pdf.stream.read() + timeout_seconds = _get_request_timeout_seconds(request) + if timeout_seconds is None and hook_ctx.config.timeout_ms is not None: + timeout_seconds = hook_ctx.config.timeout_ms / 1000 + self.operation_timeouts[operation_id] = timeout_seconds + self.operation_retry_configs[operation_id] = ( + hook_ctx.config.retry_config + if isinstance(hook_ctx.config.retry_config, RetryConfig) + else None + ) - if self.cache_tmp_data_feature: - pdf_chunk_paths = self._get_pdf_chunk_paths( - pdf_bytes, - operation_id=operation_id, - split_size=split_size, - page_start=page_range_start, - page_end=page_range_end - ) - # force free PDF object memory - del pdf - pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths) - else: - pdf_chunks = self._get_pdf_chunks_in_memory( - pdf_bytes, - split_size=split_size, - page_start=page_range_start, - page_end=page_range_end - ) + try: + pdf = self._trim_large_pages(pdf, form_data) + + pdf.stream.seek(0) + pdf_bytes = pdf.stream.read() + + temp_dir_path = None + if cache_tmp_data_feature: + pdf_chunk_paths = self._get_pdf_chunk_paths( + pdf_bytes, + operation_id=operation_id, + cache_tmp_data_dir=cache_tmp_data_dir, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) + temp_dir = self.tempdirs.get(operation_id) + temp_dir_path = temp_dir.name if temp_dir is not None else None + # force free PDF object memory + del pdf + pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths) + else: + pdf_chunks = self._get_pdf_chunks_in_memory( + pdf_bytes, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) - self.coroutines_to_execute[operation_id] = [] - set_index = 1 - for pdf_chunk_file, page_index in pdf_chunks: - page_number = page_index + starting_page_number - pdf_chunk_request = request_utils.create_pdf_chunk_request( - form_data=form_data, - pdf_chunk=(pdf_chunk_file, page_number), - filename=pdf_file_meta["filename"], - original_request=request, - ) - # using partial as the shared client parameter must be passed in `run_tasks` function - # in `after_success`. - coroutine = partial( - self.call_api_partial, - operation_id=operation_id, - pdf_chunk_request=pdf_chunk_request, - pdf_chunk_file=pdf_chunk_file, + self.coroutines_to_execute[operation_id] = [] + for pdf_chunk_file, page_index in pdf_chunks: + chunk_index = len(self.coroutines_to_execute[operation_id]) + 1 + page_number = page_index + starting_page_number + pdf_chunk_request = request_utils.create_pdf_chunk_request( + form_data=form_data, + pdf_chunk=(pdf_chunk_file, page_number), + filename=pdf_file_meta["filename"], + original_request=request, + ) + # using partial as the shared client parameter must be passed in `run_tasks` function + # in `after_success`. + coroutine = partial( + self.call_api_partial, + _operation_id=operation_id, + chunk_index=chunk_index, + page_number=page_number, + pdf_chunk_request=pdf_chunk_request, + pdf_chunk_file=pdf_chunk_file, + retry_config=self.operation_retry_configs.get(operation_id), + cache_tmp_data_feature=cache_tmp_data_feature, + temp_dir_path=temp_dir_path, + ) + self.coroutines_to_execute[operation_id].append(coroutine) + + logger.info( + "split_pdf event=plan_created operation_id=%s filename=%s strategy=%s page_range=%s-%s page_count=%d split_size=%d chunk_count=%d concurrency=%d allow_failed=%s cache_mode=%s timeout_seconds=%s retry_config_mode=%s", + operation_id, + Path(pdf_file_meta["filename"]).name, + form_data.get("strategy"), + page_range_start, + page_range_end, + page_count, + split_size, + len(self.coroutines_to_execute[operation_id]), + concurrency_level, + allow_failed, + self._cache_mode_observability_value( + cache_tmp_data_feature, + cache_tmp_data_dir, + ), + timeout_seconds, + self._retry_config_observability_mode( + self.operation_retry_configs.get(operation_id), + ), ) - self.coroutines_to_execute[operation_id].append(coroutine) - set_index += 1 - self.operation_timeouts[operation_id] = _get_request_timeout_seconds(request) - self.pending_operation_ids[hook_ctx.operation_id] = operation_id + self.pending_operation_ids[operation_id] = operation_id - return httpx.Request( - "GET", - f"{self.partition_base_url}/general/docs", - headers={"operation_id": operation_id}, - extensions=request.extensions.copy(), - ) + dummy_request_extensions = request.extensions.copy() + dummy_request_extensions[OPERATION_ID_EXTENSION_KEY] = operation_id + return httpx.Request( + "GET", + f"{self.partition_base_url}/general/docs", + headers={"operation_id": operation_id}, + extensions=dummy_request_extensions, + ) + except Exception: + self._clear_operation(operation_id) + raise async def call_api_partial( self, pdf_chunk_request: httpx.Request, pdf_chunk_file: BinaryIO, limiter: asyncio.Semaphore, - operation_id: str, + _operation_id: str, + chunk_index: int, + page_number: int, async_client: AsyncClient, + retry_config: Optional[RetryConfig], + cache_tmp_data_feature: bool, + temp_dir_path: Optional[str], ) -> httpx.Response: + logger.debug( + "split_pdf event=chunk_start operation_id=%s chunk_index=%d page_number=%d cache_mode=%s", + _operation_id, + chunk_index, + page_number, + "cached" if cache_tmp_data_feature else "memory", + ) response = await request_utils.call_api_async( client=async_client, limiter=limiter, pdf_chunk_request=pdf_chunk_request, pdf_chunk_file=pdf_chunk_file, + retry_config=retry_config, + operation_id=_operation_id, + chunk_index=chunk_index, + page_number=page_number, ) # Immediately delete request to save memory @@ -450,21 +731,37 @@ async def call_api_partial( if response.status_code == 200: - if self.cache_tmp_data_feature: + if cache_tmp_data_feature: + if temp_dir_path is None: + raise RuntimeError("Temp directory path not found for cached split PDF operation") # If we get 200, dump the contents to a file and return the path - temp_dir = self.tempdirs[operation_id] - temp_file_name = f"{temp_dir.name}/{uuid.uuid4()}.json" + temp_file_name = f"{temp_dir_path}/{uuid.uuid4()}.json" async with aiofiles.open(temp_file_name, mode='wb') as temp_file: # Avoid reading the entire response into memory async for bytes_chunk in response.aiter_bytes(): await temp_file.write(bytes_chunk) # we save the path in content attribute to be used in after_success response._content = temp_file_name.encode() # pylint: disable=protected-access + logger.debug( + "split_pdf event=chunk_cached operation_id=%s chunk_index=%d page_number=%d cache_file=%s", + _operation_id, + chunk_index, + page_number, + Path(temp_file_name).name, + ) + + logger.debug( + "split_pdf event=chunk_complete operation_id=%s chunk_index=%d page_number=%d status_code=%d", + _operation_id, + chunk_index, + page_number, + response.status_code, + ) return response def _trim_large_pages(self, pdf: PdfReader, form_data: dict[str, Any]) -> PdfReader: - if form_data['strategy'] != HI_RES_STRATEGY: + if form_data.get("strategy") != HI_RES_STRATEGY: return pdf max_page_length = MAX_PAGE_LENGTH @@ -543,6 +840,7 @@ def _get_pdf_chunk_paths( self, pdf_bytes: bytes, operation_id: str, + cache_tmp_data_dir: str, split_size: int = 1, page_start: int = 1, page_end: Optional[int] = None @@ -569,7 +867,7 @@ def _get_pdf_chunk_paths( # Create temporary directory tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with - dir=self.cache_tmp_data_dir, + dir=cache_tmp_data_dir, prefix="unstructured_client_" ) self.tempdirs[operation_id] = tempdir @@ -630,6 +928,11 @@ def _await_elements(self, operation_id: str) -> Optional[list]: Waits for the partition requests to complete and returns the flattened elements. + When `split_pdf_allow_failed=True`, chunk-level non-200 responses and + transport failures are recorded in `api_failed_responses` and omitted + from the returned element list. If every chunk fails, the combined + result is an empty list. + Args: operation_id (str): The ID of the operation. @@ -641,14 +944,17 @@ def _await_elements(self, operation_id: str) -> Optional[list]: if tasks is None: return None + started_at = time.perf_counter() concurrency_level = self.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL) timeout_seconds = self.operation_timeouts.get(operation_id) client_timeout = httpx.Timeout(timeout_seconds) if timeout_seconds is not None else None + allow_failed = self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) coroutines = run_tasks( tasks, - allow_failed=self.allow_failed, + allow_failed=allow_failed, concurrency_level=concurrency_level, client_timeout=client_timeout, + operation_id=operation_id, ) # sending the coroutines to a separate thread to avoid blocking the current event loop @@ -656,53 +962,149 @@ def _await_elements(self, operation_id: str) -> Optional[list]: executor = self.executors.get(operation_id) if executor is None: raise RuntimeError("Executor not found for operation_id") - task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines) - - # The per-chunk timeout bounds each HTTP call, but the batch may run in - # multiple waves (ceil(chunks / concurrency)). Scale the outer future - # timeout accordingly so healthy multi-wave batches aren't killed early. - num_waves = max(1, math.ceil(len(tasks) / concurrency_level)) - per_chunk = timeout_seconds or DEFAULT_FUTURE_TIMEOUT_MINUTES * 60 - future_timeout = per_chunk * num_waves + TIMEOUT_BUFFER_SECONDS - task_responses = task_responses_future.result(timeout=future_timeout) + loop_holder: dict[str, Optional[asyncio.AbstractEventLoop]] = {"loop": None} + self.operation_loops[operation_id] = loop_holder + try: + task_responses_future = executor.submit( + _run_coroutines_in_separate_thread, + coroutines, + loop_holder, + ) + self.operation_futures[operation_id] = task_responses_future + + # The per-chunk timeout bounds each HTTP call, but the batch may run in + # multiple waves (ceil(chunks / concurrency)). Scale the outer future + # timeout accordingly so healthy multi-wave batches aren't killed early. + num_waves = max(1, math.ceil(len(tasks) / concurrency_level)) + per_chunk = timeout_seconds or DEFAULT_FUTURE_TIMEOUT_MINUTES * 60 + future_timeout = per_chunk * num_waves + TIMEOUT_BUFFER_SECONDS + logger.info( + "split_pdf event=batch_start operation_id=%s chunk_count=%d concurrency=%d allow_failed=%s client_timeout_seconds=%s future_timeout_seconds=%s num_waves=%d", + operation_id, + len(tasks), + concurrency_level, + allow_failed, + timeout_seconds, + future_timeout, + num_waves, + ) + task_responses = task_responses_future.result(timeout=future_timeout) + except futures.TimeoutError: + loop = loop_holder.get("loop") + logger.error( + "split_pdf event=batch_timeout operation_id=%s chunk_count=%d concurrency=%d allow_failed=%s client_timeout_seconds=%s future_timeout_seconds=%s", + operation_id, + len(tasks), + concurrency_level, + allow_failed, + timeout_seconds, + future_timeout, + ) + cancellation_requested = _request_task_cancellation( + loop, + operation_id=operation_id, + ) + if not cancellation_requested: + coroutines.close() + raise + except Exception: + if loop_holder.get("loop") is None: + coroutines.close() + raise + finally: + if loop_holder.get("loop") is None: + coroutines.close() if task_responses is None: return None successful_responses = [] - failed_responses = [] + failed_responses: list[tuple[int, httpx.Response]] = [] + transport_failure_count = 0 elements = [] for response_number, res in task_responses: if res.status_code == 200: logger.debug( - "Successfully partitioned set #%d, elements added to the final result.", + "split_pdf event=chunk_success operation_id=%s chunk_index=%d", + operation_id, response_number, ) successful_responses.append(res) - if self.cache_tmp_data_feature: + if self.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA): elements.append(load_elements_from_response(res)) else: elements.append(res.json()) else: error_message = f"Failed to partition set {response_number}." - if self.allow_failed: + if self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED): error_message += " Its elements will be omitted from the result." - logger.error(error_message) - failed_responses.append(res) + if self._is_transport_failure_response(res): + transport_failure_count += 1 + logger.error( + "%s operation_id=%s status_code=%d transport_failure=%s", + error_message, + operation_id, + res.status_code, + self._is_transport_failure_response(res), + ) + failed_responses.append((response_number, res)) self.api_successful_responses[operation_id] = successful_responses - self.api_failed_responses[operation_id] = failed_responses + self.api_failed_responses[operation_id] = [response for _, response in failed_responses] + elapsed_ms = round((time.perf_counter() - started_at) * 1000) + logger.info( + "split_pdf event=batch_complete operation_id=%s chunk_count=%d success_count=%d failure_count=%d transport_failure_count=%d elapsed_ms=%d allow_failed=%s", + operation_id, + len(task_responses), + len(successful_responses), + len(failed_responses), + transport_failure_count, + elapsed_ms, + allow_failed, + ) + for failed_chunk_index, response in failed_responses: + self._annotate_failure_response( + operation_id, + failed_chunk_index=failed_chunk_index, + successful_count=len(successful_responses), + failed_count=len(failed_responses), + total_chunks=len(task_responses), + response=response, + ) flattened_elements = [element for sublist in elements for element in sublist] return flattened_elements + @staticmethod + def _finalize_operation_resources( + executor: Optional[futures.ThreadPoolExecutor], + tempdir: Optional[tempfile.TemporaryDirectory], + operation_id: Optional[str] = None, + ) -> None: + if executor is not None: + executor.shutdown(wait=False, cancel_futures=True) + if tempdir is not None: + tempdir.cleanup() + logger.debug( + "split_pdf event=resources_finalized operation_id=%s executor_shutdown=%s tempdir_cleaned=%s", + operation_id, + executor is not None, + tempdir is not None, + ) + def after_success( self, hook_ctx: AfterSuccessContext, response: httpx.Response ) -> Union[httpx.Response, Exception]: """Executes after a successful API request. Awaits all parallel requests and combines the responses into a single response object. + Partial-failure policy: + - `allow_failed=False`: return the first failed chunk response. + - `allow_failed=True`: return a synthetic 200 response containing only + successful chunk elements when at least one chunk succeeds; if no + chunk succeeds, return the first failed chunk response. + Args: hook_ctx (AfterSuccessContext): The context object containing information about the hook execution. @@ -714,17 +1116,33 @@ def after_success( combined response object; otherwise, the original response. Can return exception if it ocurred during the execution. """ - if not self.is_partition_request: + operation_id = self._get_operation_id(response=response) + if operation_id is None or operation_id not in self.coroutines_to_execute: return response - # Grab the correct id out of the dummy request - operation_id = response.request.headers.get("operation_id") - self.pending_operation_ids.pop(hook_ctx.operation_id, None) try: elements = self._await_elements(operation_id) # if fails are disallowed, return the first failed response - if not self.allow_failed and self.api_failed_responses.get(operation_id): + if ( + not self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + and self.api_failed_responses.get(operation_id) + ): + logger.warning( + "split_pdf event=top_level_failure operation_id=%s mode=strict failed_response_selected=true", + operation_id, + ) + return self.api_failed_responses[operation_id][0] + + if ( + self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + and not self.api_successful_responses.get(operation_id) + and self.api_failed_responses.get(operation_id) + ): + logger.warning( + "split_pdf event=top_level_failure operation_id=%s mode=allow_failed reason=no_successful_chunks", + operation_id, + ) return self.api_failed_responses[operation_id][0] if elements is None: @@ -748,8 +1166,14 @@ def after_error( we must release the executor, temp files, and coroutine list that were allocated for that operation. """ - operation_id = self.pending_operation_ids.pop(hook_ctx.operation_id, None) + operation_id = self._get_operation_id(response=response, error=error) if operation_id is not None: + logger.warning( + "split_pdf event=after_error_cleanup operation_id=%s response_present=%s error_type=%s", + operation_id, + response is not None, + type(error).__name__ if error is not None else None, + ) self._clear_operation(operation_id) return (response, error) @@ -765,9 +1189,41 @@ def _clear_operation(self, operation_id: str) -> None: self.api_failed_responses.pop(operation_id, None) self.concurrency_level.pop(operation_id, None) self.operation_timeouts.pop(operation_id, None) + self.operation_retry_configs.pop(operation_id, None) + self.allow_failed.pop(operation_id, None) + self.cache_tmp_data_feature.pop(operation_id, None) + self.cache_tmp_data_dir.pop(operation_id, None) + self.pending_operation_ids.pop(operation_id, None) + future = self.operation_futures.pop(operation_id, None) + loop_holder = self.operation_loops.pop(operation_id, None) executor = self.executors.pop(operation_id, None) - if executor is not None: - executor.shutdown(wait=False, cancel_futures=True) tempdir = self.tempdirs.pop(operation_id, None) - if tempdir: - tempdir.cleanup() + logger.debug( + "split_pdf event=clear_operation operation_id=%s has_future=%s future_done=%s has_executor=%s has_tempdir=%s", + operation_id, + future is not None, + future.done() if future is not None else None, + executor is not None, + tempdir is not None, + ) + if future is not None and not future.done(): + loop = loop_holder.get("loop") if loop_holder is not None else None + cancellation_requested = _request_task_cancellation( + loop, + operation_id=operation_id, + ) + if not cancellation_requested: + logger.warning( + "split_pdf event=clear_operation_deferred_no_loop operation_id=%s reason=worker_still_running", + operation_id, + ) + else: + logger.warning( + "split_pdf event=clear_operation_deferred operation_id=%s reason=worker_still_running", + operation_id, + ) + future.add_done_callback( + lambda _: self._finalize_operation_resources(executor, tempdir, operation_id) + ) + return + self._finalize_operation_resources(executor, tempdir, operation_id) diff --git a/src/unstructured_client/_version.py b/src/unstructured_client/_version.py index 0b6f38e9..92998fc8 100644 --- a/src/unstructured_client/_version.py +++ b/src/unstructured_client/_version.py @@ -3,10 +3,10 @@ import importlib.metadata __title__: str = "unstructured-client" -__version__: str = "0.43.0" +__version__: str = "0.43.1" __openapi_doc_version__: str = "1.2.31" __gen_version__: str = "2.680.0" -__user_agent__: str = "speakeasy-sdk/python 0.43.0 2.680.0 1.2.31 unstructured-client" +__user_agent__: str = "speakeasy-sdk/python 0.43.1 2.680.0 1.2.31 unstructured-client" try: if __package__ is not None: