diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index 8a3e1f1f..4412808a 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -4,13 +4,14 @@ import secrets import string import time +from collections.abc import AsyncIterator, Iterator from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, TypeVar, overload +from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload import pytest if TYPE_CHECKING: - from collections.abc import Coroutine + from collections.abc import Callable, Coroutine # Environment variable names for test configuration TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN' @@ -20,6 +21,16 @@ T = TypeVar('T') +class _HasId(Protocol): + """Items returned by collection `iterate()` endpoints all expose `.id`.""" + + @property + def id(self) -> str: ... + + +_HasIdT = TypeVar('_HasIdT', bound=_HasId) + + # ============================================================================ # Data classes for test fixtures # ============================================================================ @@ -108,6 +119,56 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None: time.sleep(seconds) # noqa: ASYNC251 +async def collect_iterate_until_present( + iterator_factory: Callable[[], Iterator[_HasIdT] | AsyncIterator[_HasIdT]], + expected_ids: set[str], + *, + item_type: type[_HasIdT], + is_async: bool, + max_attempts: int = 5, + interval: float = 1.0, +) -> list[_HasIdT]: + """Drain a collection `iterate()` until every expected ID is present. + + Handles eventual consistency on listing endpoints: under parallel load a freshly + created resource may not appear in the listing for a short window. Each attempt + builds a fresh iterator via `iterator_factory`, drains it, and breaks early once + `expected_ids` is a subset of the collected items' `.id` values. The most recent + collection is returned regardless of whether the condition was met, so the caller + can run its own assertion with a helpful failure message. + + Args: + iterator_factory: No-arg callable returning a fresh iterator on each call. + expected_ids: IDs that must all appear in the collected items. + item_type: Asserted to match the runtime type of each yielded item. + is_async: Whether the iterator is async (and so are sleeps). + max_attempts: Maximum number of polling rounds. + interval: Seconds to sleep before each attempt. + + Returns: + The most recently collected items. + """ + collected: list[_HasIdT] = [] + for attempt in range(max_attempts): + if attempt > 0: + await maybe_sleep(interval, is_async=is_async) + iterator = iterator_factory() + collected = [] + if is_async: + assert isinstance(iterator, AsyncIterator) + async for item in iterator: + assert isinstance(item, item_type) + collected.append(item) + else: + assert isinstance(iterator, Iterator) + for item in iterator: + assert isinstance(item, item_type) + collected.append(item) + if expected_ids.issubset(item.id for item in collected): + break + return collected + + # ============================================================================ # Pytest markers and parametrization # ============================================================================ diff --git a/tests/integration/test_dataset.py b/tests/integration/test_dataset.py index e4683107..c628ac99 100644 --- a/tests/integration/test_dataset.py +++ b/tests/integration/test_dataset.py @@ -11,7 +11,13 @@ import impit import pytest -from ._utils import DatasetFixture, get_random_resource_name, maybe_await, maybe_sleep +from ._utils import ( + DatasetFixture, + collect_iterate_until_present, + get_random_resource_name, + maybe_await, + maybe_sleep, +) from apify_client._models import Dataset, DatasetListItem, DatasetStatistics, ListOfDatasets from apify_client._resource_clients.dataset import DatasetItemsPage from apify_client.errors import ApifyApiError @@ -432,19 +438,12 @@ async def test_dataset_collection_iterate(client: ApifyClient | ApifyClientAsync created_ids.append(dataset.id) try: - iterator = client.datasets().iterate(desc=True) - collected: list[DatasetListItem] = [] - if is_async: - assert isinstance(iterator, AsyncIterator) - async for ds in iterator: - assert isinstance(ds, DatasetListItem) - collected.append(ds) - else: - assert isinstance(iterator, Iterator) - for ds in iterator: - assert isinstance(ds, DatasetListItem) - collected.append(ds) - + collected = await collect_iterate_until_present( + lambda: client.datasets().iterate(desc=True), + set(created_ids), + item_type=DatasetListItem, + is_async=is_async, + ) collected_ids = {ds.id for ds in collected} for created_id in created_ids: assert created_id in collected_ids diff --git a/tests/integration/test_key_value_store.py b/tests/integration/test_key_value_store.py index d85f7f0c..37557ecb 100644 --- a/tests/integration/test_key_value_store.py +++ b/tests/integration/test_key_value_store.py @@ -10,7 +10,13 @@ import impit import pytest -from ._utils import KvsFixture, get_random_resource_name, maybe_await, maybe_sleep +from ._utils import ( + KvsFixture, + collect_iterate_until_present, + get_random_resource_name, + maybe_await, + maybe_sleep, +) from apify_client._models import KeyValueStore, KeyValueStoreKey, ListOfKeys, ListOfKeyValueStores from apify_client.errors import ApifyApiError @@ -555,19 +561,12 @@ async def test_key_value_store_collection_iterate(client: ApifyClient | ApifyCli created_ids.append(kvs.id) try: - iterator = client.key_value_stores().iterate(desc=True) - collected: list[KeyValueStore] = [] - if is_async: - assert isinstance(iterator, AsyncIterator) - async for kvs in iterator: - assert isinstance(kvs, KeyValueStore) - collected.append(kvs) - else: - assert isinstance(iterator, Iterator) - for kvs in iterator: - assert isinstance(kvs, KeyValueStore) - collected.append(kvs) - + collected = await collect_iterate_until_present( + lambda: client.key_value_stores().iterate(desc=True), + set(created_ids), + item_type=KeyValueStore, + is_async=is_async, + ) collected_ids = {kvs.id for kvs in collected} for created_id in created_ids: assert created_id in collected_ids diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 17d8138c..d0986903 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -6,7 +6,13 @@ from datetime import timedelta from typing import TYPE_CHECKING -from ._utils import get_random_resource_name, get_random_string, maybe_await, maybe_sleep +from ._utils import ( + collect_iterate_until_present, + get_random_resource_name, + get_random_string, + maybe_await, + maybe_sleep, +) from apify_client._models import ( BatchAddResult, BatchDeleteResult, @@ -615,19 +621,12 @@ async def test_request_queue_collection_iterate(client: ApifyClient | ApifyClien created_ids.append(rq.id) try: - iterator = client.request_queues().iterate(desc=True) - collected: list[RequestQueueShort] = [] - if is_async: - assert isinstance(iterator, AsyncIterator) - async for rq in iterator: - assert isinstance(rq, RequestQueueShort) - collected.append(rq) - else: - assert isinstance(iterator, Iterator) - for rq in iterator: - assert isinstance(rq, RequestQueueShort) - collected.append(rq) - + collected = await collect_iterate_until_present( + lambda: client.request_queues().iterate(desc=True), + set(created_ids), + item_type=RequestQueueShort, + is_async=is_async, + ) collected_ids = {rq.id for rq in collected} for rq_id in created_ids: assert rq_id in collected_ids diff --git a/tests/integration/test_schedule.py b/tests/integration/test_schedule.py index 52017698..11cfed8b 100644 --- a/tests/integration/test_schedule.py +++ b/tests/integration/test_schedule.py @@ -2,10 +2,9 @@ from __future__ import annotations -from collections.abc import AsyncIterator, Iterator from typing import TYPE_CHECKING -from ._utils import get_random_resource_name, maybe_await +from ._utils import collect_iterate_until_present, get_random_resource_name, maybe_await from apify_client._models import Actor, ListOfSchedules, Schedule, ScheduleActionRunActor, ScheduleShort if TYPE_CHECKING: @@ -193,19 +192,12 @@ async def test_schedule_collection_iterate(client: ApifyClient | ApifyClientAsyn created_ids.append(schedule.id) try: - iterator = client.schedules().iterate() - collected: list[ScheduleShort] = [] - if is_async: - assert isinstance(iterator, AsyncIterator) - async for s in iterator: - assert isinstance(s, ScheduleShort) - collected.append(s) - else: - assert isinstance(iterator, Iterator) - for s in iterator: - assert isinstance(s, ScheduleShort) - collected.append(s) - + collected = await collect_iterate_until_present( + lambda: client.schedules().iterate(), + set(created_ids), + item_type=ScheduleShort, + is_async=is_async, + ) collected_ids = {s.id for s in collected} for sched_id in created_ids: assert sched_id in collected_ids diff --git a/tests/integration/test_task.py b/tests/integration/test_task.py index 2a776bc3..d73845ec 100644 --- a/tests/integration/test_task.py +++ b/tests/integration/test_task.py @@ -6,7 +6,7 @@ from datetime import timedelta from typing import TYPE_CHECKING -from ._utils import get_random_resource_name, maybe_await +from ._utils import collect_iterate_until_present, get_random_resource_name, maybe_await from apify_client._models import Actor, ListOfRuns, ListOfTasks, ListOfWebhooks, Run, RunShort, Task, TaskShort if TYPE_CHECKING: @@ -365,19 +365,12 @@ async def test_task_collection_iterate(client: ApifyClient | ApifyClientAsync, * created_ids.append(task.id) try: - iterator = client.tasks().iterate(desc=True) - collected: list[TaskShort] = [] - if is_async: - assert isinstance(iterator, AsyncIterator) - async for t in iterator: - assert isinstance(t, TaskShort) - collected.append(t) - else: - assert isinstance(iterator, Iterator) - for t in iterator: - assert isinstance(t, TaskShort) - collected.append(t) - + collected = await collect_iterate_until_present( + lambda: client.tasks().iterate(desc=True), + set(created_ids), + item_type=TaskShort, + is_async=is_async, + ) collected_ids = {t.id for t in collected} for task_id in created_ids: assert task_id in collected_ids diff --git a/tests/integration/test_webhook.py b/tests/integration/test_webhook.py index 343630f0..044f65d2 100644 --- a/tests/integration/test_webhook.py +++ b/tests/integration/test_webhook.py @@ -9,7 +9,7 @@ from apify_client import ApifyClient, ApifyClientAsync -from ._utils import maybe_await +from ._utils import collect_iterate_until_present, maybe_await from apify_client._models import ( ListOfRuns, ListOfWebhookDispatches, @@ -219,19 +219,12 @@ async def test_webhook_collection_iterate(client: ApifyClient | ApifyClientAsync assert len(set(created_ids)) == 3 try: - iterator = client.webhooks().iterate(desc=True) - collected: list[WebhookShort] = [] - if is_async: - assert isinstance(iterator, AsyncIterator) - async for w in iterator: - assert isinstance(w, WebhookShort) - collected.append(w) - else: - assert isinstance(iterator, Iterator) - for w in iterator: - assert isinstance(w, WebhookShort) - collected.append(w) - + collected = await collect_iterate_until_present( + lambda: client.webhooks().iterate(desc=True), + set(created_ids), + item_type=WebhookShort, + is_async=is_async, + ) collected_ids = {w.id for w in collected} for webhook_id in created_ids: assert webhook_id in collected_ids