diff --git a/RELEASES.md b/RELEASES.md index f1b5ba73..80c317db 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1180,4 +1180,14 @@ Based on: ### Generated - [python v0.42.9] . ### Releases -- [PyPI v0.42.9] https://pypi.org/project/unstructured-client/0.42.9 - . \ No newline at end of file +- [PyPI v0.42.9] https://pypi.org/project/unstructured-client/0.42.9 - . + +## 2026-02-03 16:49:02 +### Changes +Based on: +- OpenAPI Doc +- Speakeasy CLI 1.601.0 (2.680.0) https://github.com/speakeasy-api/speakeasy +### Generated +- [python v0.42.10] . +### Releases +- [PyPI v0.42.10] https://pypi.org/project/unstructured-client/0.42.10 - . \ No newline at end of file diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 255d47bf..43abced2 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -525,4 +525,189 @@ def test_before_request_raises_pdf_validation_error_when_pdf_check_fails(): # Verify that the mocked functions were called as expected mock_get_fields.assert_called_once_with(mock_request) mock_read_pdf.assert_called_once_with(mock_pdf_file) - mock_check_pdf.assert_called_once_with(mock_pdf_reader) \ No newline at end of file + mock_check_pdf.assert_called_once_with(mock_pdf_reader) + + +def test_per_request_settings_isolation(): + """Test that multiple concurrent requests have isolated settings. + + This validates the fix for race conditions where instance-level settings + would be shared between concurrent requests, causing one request to use + another's configuration. + """ + hook = SplitPdfHook() + + # Simulate two different operations with different settings + operation_id_1 = "op-1" + operation_id_2 = "op-2" + + # Set different settings for each operation + hook.allow_failed[operation_id_1] = True + hook.cache_tmp_data_feature[operation_id_1] = True + hook.cache_tmp_data_dir[operation_id_1] = "/tmp/op1" + hook.concurrency_level[operation_id_1] = 5 + + hook.allow_failed[operation_id_2] = False + hook.cache_tmp_data_feature[operation_id_2] = False + hook.cache_tmp_data_dir[operation_id_2] = "/tmp/op2" + hook.concurrency_level[operation_id_2] = 10 + + # Verify that each operation has its own isolated settings + assert hook.allow_failed[operation_id_1] is True + assert hook.allow_failed[operation_id_2] is False + + assert hook.cache_tmp_data_feature[operation_id_1] is True + assert hook.cache_tmp_data_feature[operation_id_2] is False + + assert hook.cache_tmp_data_dir[operation_id_1] == "/tmp/op1" + assert hook.cache_tmp_data_dir[operation_id_2] == "/tmp/op2" + + assert hook.concurrency_level[operation_id_1] == 5 + assert hook.concurrency_level[operation_id_2] == 10 + + +def test_per_request_settings_cleanup(): + """Test that per-request settings are properly cleaned up after operation completes.""" + hook = SplitPdfHook() + + operation_id = "test-op" + + # Set up operation data + hook.allow_failed[operation_id] = True + hook.cache_tmp_data_feature[operation_id] = True + hook.cache_tmp_data_dir[operation_id] = "/tmp/test" + hook.concurrency_level[operation_id] = 8 + hook.coroutines_to_execute[operation_id] = [] + hook.api_successful_responses[operation_id] = [] + hook.api_failed_responses[operation_id] = [] + + # Verify data exists + assert operation_id in hook.allow_failed + assert operation_id in hook.cache_tmp_data_feature + assert operation_id in hook.cache_tmp_data_dir + assert operation_id in hook.concurrency_level + + # Clear the operation + hook._clear_operation(operation_id) + + # Verify all data is cleaned up + assert operation_id not in hook.allow_failed + assert operation_id not in hook.cache_tmp_data_feature + assert operation_id not in hook.cache_tmp_data_dir + assert operation_id not in hook.concurrency_level + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.api_successful_responses + assert operation_id not in hook.api_failed_responses + + +@pytest.mark.asyncio +async def test_concurrent_async_operations_isolation(): + """Test that concurrent async operations maintain isolated settings. + + This simulates the real-world scenario where multiple partition_async + calls are made concurrently and ensures they don't interfere with each other. + """ + hook = SplitPdfHook() + + # Track which settings each operation saw + operation_settings = {} + + async def simulate_operation(op_id: str, allow_failed: bool, cache_enabled: bool): + """Simulate an operation that sets and reads its own settings.""" + # Set operation-specific settings + hook.allow_failed[op_id] = allow_failed + hook.cache_tmp_data_feature[op_id] = cache_enabled + hook.concurrency_level[op_id] = 5 + + # Simulate some async work + await asyncio.sleep(0.01) + + # Read back settings and verify they haven't changed + operation_settings[op_id] = { + 'allow_failed': hook.allow_failed.get(op_id), + 'cache_enabled': hook.cache_tmp_data_feature.get(op_id), + 'concurrency_level': hook.concurrency_level.get(op_id), + } + + # Clean up + hook._clear_operation(op_id) + + # Run multiple operations concurrently with different settings + tasks = [ + simulate_operation("op-1", True, True), + simulate_operation("op-2", False, False), + simulate_operation("op-3", True, False), + simulate_operation("op-4", False, True), + ] + + await asyncio.gather(*tasks) + + # Verify each operation saw its own settings correctly + assert operation_settings["op-1"] == {'allow_failed': True, 'cache_enabled': True, 'concurrency_level': 5} + assert operation_settings["op-2"] == {'allow_failed': False, 'cache_enabled': False, 'concurrency_level': 5} + assert operation_settings["op-3"] == {'allow_failed': True, 'cache_enabled': False, 'concurrency_level': 5} + assert operation_settings["op-4"] == {'allow_failed': False, 'cache_enabled': True, 'concurrency_level': 5} + + +@pytest.mark.asyncio +async def test_await_elements_uses_operation_settings(): + """Test that _await_elements correctly uses per-operation settings.""" + hook = SplitPdfHook() + + operation_id = "test-op" + + # Set operation-specific settings + hook.allow_failed[operation_id] = True + hook.cache_tmp_data_feature[operation_id] = False + hook.concurrency_level[operation_id] = 3 + + # Mock the coroutines to execute + async def mock_coroutine(async_client, limiter): + """Mock coroutine that returns a successful response.""" + response = httpx.Response( + status_code=200, + json=[{"element": "test"}], + ) + return response + + hook.coroutines_to_execute[operation_id] = [ + partial(mock_coroutine) + ] + + # Mock run_tasks to verify it receives the correct settings + with patch("unstructured_client._hooks.custom.split_pdf_hook.run_tasks") as mock_run_tasks: + mock_run_tasks.return_value = [(1, httpx.Response( + status_code=200, + content=b'[{"element": "test"}]', + ))] + + await hook._await_elements(operation_id) + + # Verify run_tasks was called with the operation-specific settings + mock_run_tasks.assert_called_once() + call_args = mock_run_tasks.call_args + + # Check that allow_failed matches what we set + assert call_args.kwargs['allow_failed'] is True + assert call_args.kwargs['concurrency_level'] == 3 + + +def test_default_values_used_when_operation_not_found(): + """Test that default values are used when operation_id is not in the settings dicts.""" + hook = SplitPdfHook() + + # Don't set any values for this operation + operation_id = "missing-op" + + # Access settings with .get() should return defaults + from unstructured_client._hooks.custom.split_pdf_hook import ( + DEFAULT_ALLOW_FAILED, + DEFAULT_CACHE_TMP_DATA, + DEFAULT_CACHE_TMP_DATA_DIR, + DEFAULT_CONCURRENCY_LEVEL, + ) + + assert hook.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) == DEFAULT_ALLOW_FAILED + assert hook.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA) == DEFAULT_CACHE_TMP_DATA + assert hook.cache_tmp_data_dir.get(operation_id, DEFAULT_CACHE_TMP_DATA_DIR) == DEFAULT_CACHE_TMP_DATA_DIR + assert hook.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL) == DEFAULT_CONCURRENCY_LEVEL \ No newline at end of file diff --git a/gen.yaml b/gen.yaml index 42197469..f85e8e9c 100644 --- a/gen.yaml +++ b/gen.yaml @@ -23,7 +23,7 @@ generation: schemas: allOfMergeStrategy: shallowMerge python: - version: 0.42.10 + version: 0.43.0 additionalDependencies: dev: deepdiff: '>=6.0' diff --git a/poetry.lock b/poetry.lock index 772d6bae..ae31a04f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. [[package]] name = "aiofiles" @@ -959,7 +959,7 @@ files = [ ] [package.dependencies] -astroid = ">=3.2.2,<=3.3.0.dev0" +astroid = ">=3.2.2,<=3.3.0-dev0" colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""} dill = [ {version = ">=0.2", markers = "python_version < \"3.11\""}, diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 605a1f54..da09b379 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -9,7 +9,6 @@ import tempfile import uuid from collections.abc import Awaitable -from concurrent import futures from functools import partial from pathlib import Path from typing import Any, Coroutine, Optional, Tuple, Union, cast, Generator, BinaryIO @@ -38,6 +37,9 @@ AfterErrorHook, AfterSuccessContext, AfterSuccessHook, + AsyncAfterErrorHook, + AsyncAfterSuccessHook, + AsyncBeforeRequestHook, BeforeRequestContext, BeforeRequestHook, SDKInitHook, @@ -57,12 +59,6 @@ HI_RES_STRATEGY = 'hi_res' MAX_PAGE_LENGTH = 4000 -def _run_coroutines_in_separate_thread( - coroutines_task: Coroutine[Any, Any, list[tuple[int, httpx.Response]]], -) -> list[tuple[int, httpx.Response]]: - return asyncio.run(coroutines_task) - - async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Response]: response = await coro return index, response @@ -128,7 +124,7 @@ def get_optimal_split_size(num_pages: int, concurrency_level: int) -> int: return max(split_size, MIN_PAGES_PER_SPLIT) -def load_elements_from_response(response: httpx.Response) -> list[dict]: +async def load_elements_from_response(response: httpx.Response) -> list[dict]: """Loads elements from the response content - the response was modified to keep the path for the json file that should be loaded and returned @@ -139,11 +135,20 @@ def load_elements_from_response(response: httpx.Response) -> list[dict]: Returns: list[dict]: The elements loaded from the response content cached in the json file. """ - with open(response.text, mode="r", encoding="utf-8") as file: - return json.load(file) + async with aiofiles.open(response.text, mode="r", encoding="utf-8") as file: + content = await file.read() + return json.loads(content) -class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorHook): +class SplitPdfHook( + SDKInitHook, + BeforeRequestHook, + AfterSuccessHook, + AfterErrorHook, + AsyncBeforeRequestHook, + AsyncAfterSuccessHook, + AsyncAfterErrorHook, +): """ A hook class that splits a PDF file into multiple pages and sends each page as a separate request. This hook is designed to be used with an Speakeasy SDK. @@ -164,11 +169,11 @@ def __init__(self) -> None: self.concurrency_level: dict[str, int] = {} self.api_successful_responses: dict[str, list[httpx.Response]] = {} self.api_failed_responses: dict[str, list[httpx.Response]] = {} - self.executors: dict[str, futures.ThreadPoolExecutor] = {} self.tempdirs: dict[str, tempfile.TemporaryDirectory] = {} - self.allow_failed: bool = DEFAULT_ALLOW_FAILED - self.cache_tmp_data_feature: bool = DEFAULT_CACHE_TMP_DATA - self.cache_tmp_data_dir: str = DEFAULT_CACHE_TMP_DATA_DIR + # Store per-operation settings to avoid race conditions with concurrent requests + self.allow_failed: dict[str, bool] = {} + self.cache_tmp_data_feature: dict[str, bool] = {} + self.cache_tmp_data_dir: dict[str, str] = {} def sdk_init( self, base_url: str, client: HttpClient @@ -303,7 +308,7 @@ def before_request( fallback_value=DEFAULT_STARTING_PAGE_NUMBER, ) - self.allow_failed = form_utils.get_split_pdf_allow_failed_param( + self.allow_failed[operation_id] = form_utils.get_split_pdf_allow_failed_param( form_data, key=PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, fallback_value=DEFAULT_ALLOW_FAILED, @@ -316,16 +321,13 @@ def before_request( max_allowed=MAX_CONCURRENCY_LEVEL, ) - executor = futures.ThreadPoolExecutor(max_workers=1) - self.executors[operation_id] = executor - - self.cache_tmp_data_feature = form_utils.get_split_pdf_cache_tmp_data( + self.cache_tmp_data_feature[operation_id] = form_utils.get_split_pdf_cache_tmp_data( form_data, key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY, fallback_value=DEFAULT_CACHE_TMP_DATA, ) - self.cache_tmp_data_dir = form_utils.get_split_pdf_cache_tmp_data_dir( + self.cache_tmp_data_dir[operation_id] = form_utils.get_split_pdf_cache_tmp_data_dir( form_data, key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_DIR_KEY, fallback_value=DEFAULT_CACHE_TMP_DATA_DIR, @@ -353,7 +355,7 @@ def before_request( pdf.stream.seek(0) pdf_bytes = pdf.stream.read() - if self.cache_tmp_data_feature: + if self.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA): pdf_chunk_paths = self._get_pdf_chunk_paths( pdf_bytes, operation_id=operation_id, @@ -428,16 +430,17 @@ async def call_api_partial( if response.status_code == 200: - if self.cache_tmp_data_feature: + if self.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA): # If we get 200, dump the contents to a file and return the path - temp_dir = self.tempdirs[operation_id] - temp_file_name = f"{temp_dir.name}/{uuid.uuid4()}.json" - async with aiofiles.open(temp_file_name, mode='wb') as temp_file: - # Avoid reading the entire response into memory - async for bytes_chunk in response.aiter_bytes(): - await temp_file.write(bytes_chunk) - # we save the path in content attribute to be used in after_success - response._content = temp_file_name.encode() # pylint: disable=protected-access + temp_dir = self.tempdirs.get(operation_id) + if temp_dir: # Ensure tempdir still exists + temp_file_name = f"{temp_dir.name}/{uuid.uuid4()}.json" + async with aiofiles.open(temp_file_name, mode='wb') as temp_file: + # Avoid reading the entire response into memory + async for bytes_chunk in response.aiter_bytes(): + await temp_file.write(bytes_chunk) + # we save the path in content attribute to be used in after_success + response._content = temp_file_name.encode() # pylint: disable=protected-access return response @@ -546,8 +549,9 @@ def _get_pdf_chunk_paths( offset_end = page_end if page_end else len(pdf) # Create temporary directory + cache_dir = self.cache_tmp_data_dir.get(operation_id, DEFAULT_CACHE_TMP_DATA_DIR) tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with - dir=self.cache_tmp_data_dir, + dir=cache_dir, prefix="unstructured_client_" ) self.tempdirs[operation_id] = tempdir @@ -603,7 +607,7 @@ def _get_pdf_chunk_files( raise yield pdf_chunk_file, offset - def _await_elements(self, operation_id: str) -> Optional[list]: + async def _await_elements(self, operation_id: str) -> Optional[list]: """ Waits for the partition requests to complete and returns the flattened elements. @@ -620,15 +624,8 @@ def _await_elements(self, operation_id: str) -> Optional[list]: return None concurrency_level = self.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL) - coroutines = run_tasks(tasks, allow_failed=self.allow_failed, concurrency_level=concurrency_level) - - # sending the coroutines to a separate thread to avoid blocking the current event loop - # this operation should be removed when the SDK is updated to support async hooks - executor = self.executors.get(operation_id) - if executor is None: - raise RuntimeError("Executor not found for operation_id") - task_responses_future = executor.submit(_run_coroutines_in_separate_thread, coroutines) - task_responses = task_responses_future.result() + allow_failed = self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + task_responses = await run_tasks(tasks, allow_failed=allow_failed, concurrency_level=concurrency_level) if task_responses is None: return None @@ -636,6 +633,9 @@ def _await_elements(self, operation_id: str) -> Optional[list]: successful_responses = [] failed_responses = [] elements = [] + cache_enabled = self.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA) + allow_failed = self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + for response_number, res in task_responses: if res.status_code == 200: logger.debug( @@ -643,14 +643,14 @@ def _await_elements(self, operation_id: str) -> Optional[list]: response_number, ) successful_responses.append(res) - if self.cache_tmp_data_feature: - elements.append(load_elements_from_response(res)) + if cache_enabled: + elements.append(await load_elements_from_response(res)) else: elements.append(res.json()) else: error_message = f"Failed to partition set {response_number}." - if self.allow_failed: + if allow_failed: error_message += " Its elements will be omitted from the result." logger.error(error_message) @@ -664,7 +664,226 @@ def _await_elements(self, operation_id: str) -> Optional[list]: def after_success( self, hook_ctx: AfterSuccessContext, response: httpx.Response ) -> Union[httpx.Response, Exception]: - """Executes after a successful API request. Awaits all parallel requests and + """Executes after a successful API request in sync context. + + Note: This is only used by the sync partition() method. The split PDF hook + doesn't support sync operations since it needs async for parallel requests. + For async operations, use partition_async which calls after_success_async. + + Args: + hook_ctx (AfterSuccessContext): The context object containing information + about the hook execution. + response (httpx.Response): The response object from the SDK call. + + Returns: + Union[httpx.Response, Exception]: The original response since sync hook + doesn't support PDF splitting with parallel requests. + """ + # The sync version doesn't support split PDF operations + # Those only work with the async version + return response + + def after_error( + self, + hook_ctx: AfterErrorContext, + response: Optional[httpx.Response], + error: Optional[Exception], + ) -> Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]: + """This hook is unused. In the before hook, we return a mock request + for the SDK to run. This will take us right to the after_success hook + to await the split results. + + Args: + hook_ctx (AfterErrorContext): The AfterErrorContext object containing + information about the hook context. + response (Optional[httpx.Response]): The Response object representing + the response received before the exception occurred. + error (Optional[Exception]): The exception object that was thrown. + + Returns: + Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]: + """ + return (response, error) + + async def before_request_async( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> Union[httpx.Request, Exception]: + """If `splitPdfPage` is set to `true` in the request, the PDF file is split into + separate pages. Each page is sent as a separate request in parallel. The last + page request is returned by this method. It will return the original request + when: `splitPdfPage` is set to `false`, the file is not a PDF, or the HTTP + has not been initialized. + + Note: The preparation work (PDF splitting, building requests) is CPU-bound + and doesn't benefit from async, but this method is async to fit the hook interface. + + Args: + hook_ctx (BeforeRequestContext): The hook context containing information about + the operation. + request (httpx.PreparedRequest): The request object. + + Returns: + Union[httpx.PreparedRequest, Exception]: If `splitPdfPage` is set to `true`, + the last page request; otherwise, the original request. + """ + + # Actually the general.partition operation overwrites the default client's base url (as + # the platform operations do). Here we need to get the base url from the request object. + if hook_ctx.operation_id == "partition": + self.partition_base_url = get_base_url(request.url) + self.is_partition_request = True + else: + self.is_partition_request = False + return request + + if self.client is None: + logger.warning("HTTP client not accessible! Continuing without splitting.") + return request + + # This is our key into coroutines_to_execute + # We need to pass it on to after_success so + # we know which results are ours + operation_id = str(uuid.uuid4()) + + content_type = request.headers.get("Content-Type") + if content_type is None: + return request + + form_data = request_utils.get_multipart_stream_fields(request) + if not form_data: + return request + + split_pdf_page = form_data.get(PARTITION_FORM_SPLIT_PDF_PAGE_KEY) + if split_pdf_page is None or split_pdf_page == "false": + return request + + pdf_file_meta = form_data.get(PARTITION_FORM_FILES_KEY) + if ( + pdf_file_meta is None or not all(metadata in pdf_file_meta for metadata in + ["filename", "content_type", "file"]) + ): + return request + pdf_file = pdf_file_meta.get("file") + if pdf_file is None: + return request + + pdf = pdf_utils.read_pdf(pdf_file) + if pdf is None: + return request + + pdf = pdf_utils.check_pdf(pdf) + + starting_page_number = form_utils.get_starting_page_number( + form_data, + key=PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, + fallback_value=DEFAULT_STARTING_PAGE_NUMBER, + ) + + self.allow_failed[operation_id] = form_utils.get_split_pdf_allow_failed_param( + form_data, + key=PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, + fallback_value=DEFAULT_ALLOW_FAILED, + ) + + self.concurrency_level[operation_id] = form_utils.get_split_pdf_concurrency_level_param( + form_data, + key=PARTITION_FORM_CONCURRENCY_LEVEL_KEY, + fallback_value=DEFAULT_CONCURRENCY_LEVEL, + max_allowed=MAX_CONCURRENCY_LEVEL, + ) + + self.cache_tmp_data_feature[operation_id] = form_utils.get_split_pdf_cache_tmp_data( + form_data, + key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_KEY, + fallback_value=DEFAULT_CACHE_TMP_DATA, + ) + + self.cache_tmp_data_dir[operation_id] = form_utils.get_split_pdf_cache_tmp_data_dir( + form_data, + key=PARTITION_FORM_SPLIT_CACHE_TMP_DATA_DIR_KEY, + fallback_value=DEFAULT_CACHE_TMP_DATA_DIR, + ) + + page_range_start, page_range_end = form_utils.get_page_range( + form_data, + key=PARTITION_FORM_PAGE_RANGE_KEY.replace("[]", ""), + max_pages=pdf.get_num_pages(), + ) + + page_count = page_range_end - page_range_start + 1 + + split_size = get_optimal_split_size( + num_pages=page_count, concurrency_level=self.concurrency_level[operation_id] + ) + + # If the doc is small enough, and we aren't slicing it with a page range: + # do not split, just continue with the original request + if split_size >= page_count and page_count == len(pdf.pages): + return request + + pdf = self._trim_large_pages(pdf, form_data) + + pdf.stream.seek(0) + pdf_bytes = pdf.stream.read() + + if self.cache_tmp_data_feature.get(operation_id, DEFAULT_CACHE_TMP_DATA): + pdf_chunk_paths = self._get_pdf_chunk_paths( + pdf_bytes, + operation_id=operation_id, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) + # force free PDF object memory + del pdf + pdf_chunks = self._get_pdf_chunk_files(pdf_chunk_paths) + else: + pdf_chunks = self._get_pdf_chunks_in_memory( + pdf_bytes, + split_size=split_size, + page_start=page_range_start, + page_end=page_range_end + ) + + self.coroutines_to_execute[operation_id] = [] + set_index = 1 + for pdf_chunk_file, page_index in pdf_chunks: + page_number = page_index + starting_page_number + pdf_chunk_request = request_utils.create_pdf_chunk_request( + form_data=form_data, + pdf_chunk=(pdf_chunk_file, page_number), + filename=pdf_file_meta["filename"], + original_request=request, + ) + # using partial as the shared client parameter must be passed in `run_tasks` function + # in `after_success`. + coroutine = partial( + self.call_api_partial, + operation_id=operation_id, + pdf_chunk_request=pdf_chunk_request, + pdf_chunk_file=pdf_chunk_file, + ) + self.coroutines_to_execute[operation_id].append(coroutine) + set_index += 1 + + # Return a dummy request for the SDK to use + # This allows us to skip right to the AfterRequestHook and await all the calls + # Also, pass the operation_id so after_success can await the right results + + # Note: We need access to the async_client from the sdk_init hook in order to set + # up a mock request like this. + # For now, just make an extra request against our api, which should return 200. + # dummy_request = httpx.Request("GET", "http://no-op") + return httpx.Request( + "GET", + f"{self.partition_base_url}/general/docs", + headers={"operation_id": operation_id}, + ) + + async def after_success_async( + self, hook_ctx: AfterSuccessContext, response: httpx.Response + ) -> Union[httpx.Response, Exception]: + """Async version of after_success. Awaits all parallel requests and combines the responses into a single response object. Args: @@ -676,7 +895,7 @@ def after_success( Returns: Union[httpx.Response, Exception]: If requests were run in parallel, a combined response object; otherwise, the original response. Can return - exception if it ocurred during the execution. + exception if it occurred during the execution. """ if not self.is_partition_request: return response @@ -684,10 +903,11 @@ def after_success( # Grab the correct id out of the dummy request operation_id = response.request.headers.get("operation_id") - elements = self._await_elements(operation_id) + elements = await self._await_elements(operation_id) # if fails are disallowed, return the first failed response - if not self.allow_failed and self.api_failed_responses.get(operation_id): + allow_failed = self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + if not allow_failed and self.api_failed_responses.get(operation_id): failure_response = self.api_failed_responses[operation_id][0] self._clear_operation(operation_id) @@ -701,27 +921,14 @@ def after_success( return new_response - def after_error( + async def after_error_async( self, hook_ctx: AfterErrorContext, response: Optional[httpx.Response], error: Optional[Exception], ) -> Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]: - """This hook is unused. In the before hook, we return a mock request - for the SDK to run. This will take us right to the after_success hook - to await the split results. - - Args: - hook_ctx (AfterErrorContext): The AfterErrorContext object containing - information about the hook context. - response (Optional[httpx.Response]): The Response object representing - the response received before the exception occurred. - error (Optional[Exception]): The exception object that was thrown. - - Returns: - Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]: - """ - return (response, error) + """Async version of after_error. Delegates to sync version since no async work needed.""" + return self.after_error(hook_ctx, response, error) def _clear_operation(self, operation_id: str) -> None: """ @@ -732,10 +939,11 @@ def _clear_operation(self, operation_id: str) -> None: """ self.coroutines_to_execute.pop(operation_id, None) self.api_successful_responses.pop(operation_id, None) + self.api_failed_responses.pop(operation_id, None) self.concurrency_level.pop(operation_id, None) - executor = self.executors.pop(operation_id, None) - if executor is not None: - executor.shutdown(wait=True) + self.allow_failed.pop(operation_id, None) + self.cache_tmp_data_feature.pop(operation_id, None) + self.cache_tmp_data_dir.pop(operation_id, None) tempdir = self.tempdirs.pop(operation_id, None) if tempdir: tempdir.cleanup() diff --git a/src/unstructured_client/_hooks/registration.py b/src/unstructured_client/_hooks/registration.py index 22cd276d..7fd0abea 100644 --- a/src/unstructured_client/_hooks/registration.py +++ b/src/unstructured_client/_hooks/registration.py @@ -36,10 +36,19 @@ def init_hooks(hooks: Hooks): # Register Before Request hooks hooks.register_before_request_hook(split_pdf_hook) - # Register After Error hooks + # Register After Success hooks hooks.register_after_success_hook(split_pdf_hook) hooks.register_after_success_hook(logger_hook) # Register After Error hooks hooks.register_after_error_hook(split_pdf_hook) - hooks.register_after_error_hook(logger_hook) + hooks.register_after_error_hook(logger_hook) + + # Register Async Before Request hooks + hooks.register_async_before_request_hook(split_pdf_hook) + + # Register Async After Success hooks + hooks.register_async_after_success_hook(split_pdf_hook) + + # Register Async After Error hooks + hooks.register_async_after_error_hook(split_pdf_hook) diff --git a/src/unstructured_client/_hooks/sdkhooks.py b/src/unstructured_client/_hooks/sdkhooks.py index cc59759f..bbebf332 100644 --- a/src/unstructured_client/_hooks/sdkhooks.py +++ b/src/unstructured_client/_hooks/sdkhooks.py @@ -1,6 +1,7 @@ """Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.""" import httpx +import inspect from .types import ( SDKInitHook, BeforeRequestContext, @@ -9,11 +10,15 @@ AfterSuccessHook, AfterErrorContext, AfterErrorHook, + AsyncSDKInitHook, + AsyncBeforeRequestHook, + AsyncAfterSuccessHook, + AsyncAfterErrorHook, Hooks, ) from .registration import init_hooks -from typing import List, Optional, Tuple -from unstructured_client.httpclient import HttpClient +from typing import List, Optional, Tuple, Union +from unstructured_client.httpclient import AsyncHttpClient, HttpClient class SDKHooks(Hooks): @@ -22,6 +27,10 @@ def __init__(self) -> None: self.before_request_hooks: List[BeforeRequestHook] = [] self.after_success_hooks: List[AfterSuccessHook] = [] self.after_error_hooks: List[AfterErrorHook] = [] + self.async_sdk_init_hooks: List[AsyncSDKInitHook] = [] + self.async_before_request_hooks: List[AsyncBeforeRequestHook] = [] + self.async_after_success_hooks: List[AsyncAfterSuccessHook] = [] + self.async_after_error_hooks: List[AsyncAfterErrorHook] = [] init_hooks(self) def register_sdk_init_hook(self, hook: SDKInitHook) -> None: @@ -36,6 +45,18 @@ def register_after_success_hook(self, hook: AfterSuccessHook) -> None: def register_after_error_hook(self, hook: AfterErrorHook) -> None: self.after_error_hooks.append(hook) + def register_async_sdk_init_hook(self, hook: AsyncSDKInitHook) -> None: + self.async_sdk_init_hooks.append(hook) + + def register_async_before_request_hook(self, hook: AsyncBeforeRequestHook) -> None: + self.async_before_request_hooks.append(hook) + + def register_async_after_success_hook(self, hook: AsyncAfterSuccessHook) -> None: + self.async_after_success_hooks.append(hook) + + def register_async_after_error_hook(self, hook: AsyncAfterErrorHook) -> None: + self.async_after_error_hooks.append(hook) + def sdk_init(self, base_url: str, client: HttpClient) -> Tuple[str, HttpClient]: for hook in self.sdk_init_hooks: base_url, client = hook.sdk_init(base_url, client) @@ -74,3 +95,75 @@ def after_error( raise result response, error = result return response, error + + async def sdk_init_async( + self, base_url: str, client: AsyncHttpClient + ) -> Tuple[str, AsyncHttpClient]: + # Run async hooks first + for hook in self.async_sdk_init_hooks: + base_url, client = await hook.sdk_init_async(base_url, client) + # Then run sync hooks for backward compatibility (if hook has sync version) + for hook in self.sdk_init_hooks: + # Cast the client temporarily - sync hooks may not touch async client + base_url, _ = hook.sdk_init(base_url, client) # type: ignore + return base_url, client + + async def before_request_async( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> httpx.Request: + # Run async hooks first + for hook in self.async_before_request_hooks: + out = await hook.before_request_async(hook_ctx, request) + if isinstance(out, Exception): + raise out + request = out + + # Then run sync hooks for backward compatibility + for hook in self.before_request_hooks: + out = hook.before_request(hook_ctx, request) + if isinstance(out, Exception): + raise out + request = out + + return request + + async def after_success_async( + self, hook_ctx: AfterSuccessContext, response: httpx.Response + ) -> httpx.Response: + # Run async hooks first + for hook in self.async_after_success_hooks: + out = await hook.after_success_async(hook_ctx, response) + if isinstance(out, Exception): + raise out + response = out + + # Then run sync hooks for backward compatibility + for hook in self.after_success_hooks: + out = hook.after_success(hook_ctx, response) + if isinstance(out, Exception): + raise out + response = out + + return response + + async def after_error_async( + self, + hook_ctx: AfterErrorContext, + response: Optional[httpx.Response], + error: Optional[Exception], + ) -> Tuple[Optional[httpx.Response], Optional[Exception]]: + # Run async hooks first + for hook in self.async_after_error_hooks: + result = await hook.after_error_async(hook_ctx, response, error) + if isinstance(result, Exception): + raise result + response, error = result + + # Then run sync hooks for backward compatibility + for hook in self.after_error_hooks: + result = hook.after_error(hook_ctx, response, error) + if isinstance(result, Exception): + raise result + response, error = result + + return response, error diff --git a/src/unstructured_client/_hooks/types.py b/src/unstructured_client/_hooks/types.py index a2840fe4..68176f5f 100644 --- a/src/unstructured_client/_hooks/types.py +++ b/src/unstructured_client/_hooks/types.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod import httpx from typing import Any, Callable, List, Optional, Tuple, Union -from unstructured_client.httpclient import HttpClient +from unstructured_client.httpclient import AsyncHttpClient, HttpClient from unstructured_client.sdkconfiguration import SDKConfiguration @@ -95,6 +95,41 @@ def after_error( pass +class AsyncSDKInitHook(ABC): + @abstractmethod + async def sdk_init_async( + self, base_url: str, client: AsyncHttpClient + ) -> Tuple[str, AsyncHttpClient]: + pass + + +class AsyncBeforeRequestHook(ABC): + @abstractmethod + async def before_request_async( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> Union[httpx.Request, Exception]: + pass + + +class AsyncAfterSuccessHook(ABC): + @abstractmethod + async def after_success_async( + self, hook_ctx: AfterSuccessContext, response: httpx.Response + ) -> Union[httpx.Response, Exception]: + pass + + +class AsyncAfterErrorHook(ABC): + @abstractmethod + async def after_error_async( + self, + hook_ctx: AfterErrorContext, + response: Optional[httpx.Response], + error: Optional[Exception], + ) -> Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]: + pass + + class Hooks(ABC): @abstractmethod def register_sdk_init_hook(self, hook: SDKInitHook): @@ -111,3 +146,19 @@ def register_after_success_hook(self, hook: AfterSuccessHook): @abstractmethod def register_after_error_hook(self, hook: AfterErrorHook): pass + + @abstractmethod + def register_async_sdk_init_hook(self, hook: AsyncSDKInitHook): + pass + + @abstractmethod + def register_async_before_request_hook(self, hook: AsyncBeforeRequestHook): + pass + + @abstractmethod + def register_async_after_success_hook(self, hook: AsyncAfterSuccessHook): + pass + + @abstractmethod + def register_async_after_error_hook(self, hook: AsyncAfterErrorHook): + pass diff --git a/src/unstructured_client/_version.py b/src/unstructured_client/_version.py index 699b7dfc..e17f20f9 100644 --- a/src/unstructured_client/_version.py +++ b/src/unstructured_client/_version.py @@ -3,10 +3,10 @@ import importlib.metadata __title__: str = "unstructured-client" -__version__: str = "0.42.9" -__openapi_doc_version__: str = "1.2.28" +__version__: str = "0.42.10" +__openapi_doc_version__: str = "1.2.31" __gen_version__: str = "2.680.0" -__user_agent__: str = "speakeasy-sdk/python 0.42.9 2.680.0 1.2.28 unstructured-client" +__user_agent__: str = "speakeasy-sdk/python 0.42.10 2.680.0 1.2.31 unstructured-client" try: if __package__ is not None: diff --git a/src/unstructured_client/basesdk.py b/src/unstructured_client/basesdk.py index a1c0f684..fcbb0099 100644 --- a/src/unstructured_client/basesdk.py +++ b/src/unstructured_client/basesdk.py @@ -300,7 +300,7 @@ async def do_request_async( async def do(): http_res = None try: - req = hooks.before_request(BeforeRequestContext(hook_ctx), request) + req = await hooks.before_request_async(BeforeRequestContext(hook_ctx), request) logger.debug( "Request:\nMethod: %s\nURL: %s\nHeaders: %s\nBody: %s", req.method, @@ -314,7 +314,7 @@ async def do(): http_res = await client.send(req, stream=stream) except Exception as e: - _, e = hooks.after_error(AfterErrorContext(hook_ctx), None, e) + _, e = await hooks.after_error_async(AfterErrorContext(hook_ctx), None, e) if e is not None: logger.debug("Request Exception", exc_info=True) raise e @@ -332,7 +332,7 @@ async def do(): ) if utils.match_status_codes(error_status_codes, http_res.status_code): - result, err = hooks.after_error( + result, err = await hooks.after_error_async( AfterErrorContext(hook_ctx), http_res, None ) if err is not None: @@ -354,6 +354,6 @@ async def do(): http_res = await do() if not utils.match_status_codes(error_status_codes, http_res.status_code): - http_res = hooks.after_success(AfterSuccessContext(hook_ctx), http_res) + http_res = await hooks.after_success_async(AfterSuccessContext(hook_ctx), http_res) return http_res