Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions tests/integration/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import secrets
import string
import time
from collections.abc import AsyncIterator, Iterator
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, TypeVar, overload
from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload

import pytest

if TYPE_CHECKING:
from collections.abc import Coroutine
from collections.abc import Callable, Coroutine

# Environment variable names for test configuration
TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
Expand All @@ -20,6 +21,16 @@
T = TypeVar('T')


class _HasId(Protocol):
"""Items returned by collection `iterate()` endpoints all expose `.id`."""

@property
def id(self) -> str: ...


_HasIdT = TypeVar('_HasIdT', bound=_HasId)


# ============================================================================
# Data classes for test fixtures
# ============================================================================
Expand Down Expand Up @@ -108,6 +119,56 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None:
time.sleep(seconds) # noqa: ASYNC251


async def collect_iterate_until_present(
iterator_factory: Callable[[], Iterator[_HasIdT] | AsyncIterator[_HasIdT]],
expected_ids: set[str],
*,
item_type: type[_HasIdT],
is_async: bool,
max_attempts: int = 5,
interval: float = 1.0,
) -> list[_HasIdT]:
"""Drain a collection `iterate()` until every expected ID is present.

Handles eventual consistency on listing endpoints: under parallel load a freshly
created resource may not appear in the listing for a short window. Each attempt
builds a fresh iterator via `iterator_factory`, drains it, and breaks early once
`expected_ids` is a subset of the collected items' `.id` values. The most recent
collection is returned regardless of whether the condition was met, so the caller
can run its own assertion with a helpful failure message.

Args:
iterator_factory: No-arg callable returning a fresh iterator on each call.
expected_ids: IDs that must all appear in the collected items.
item_type: Asserted to match the runtime type of each yielded item.
is_async: Whether the iterator is async (and so are sleeps).
max_attempts: Maximum number of polling rounds.
interval: Seconds to sleep before each attempt.

Returns:
The most recently collected items.
"""
collected: list[_HasIdT] = []
for attempt in range(max_attempts):
if attempt > 0:
await maybe_sleep(interval, is_async=is_async)
iterator = iterator_factory()
collected = []
if is_async:
assert isinstance(iterator, AsyncIterator)
async for item in iterator:
assert isinstance(item, item_type)
collected.append(item)
else:
assert isinstance(iterator, Iterator)
for item in iterator:
assert isinstance(item, item_type)
collected.append(item)
if expected_ids.issubset(item.id for item in collected):
break
return collected


# ============================================================================
# Pytest markers and parametrization
# ============================================================================
Expand Down
27 changes: 13 additions & 14 deletions tests/integration/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
import impit
import pytest

from ._utils import DatasetFixture, get_random_resource_name, maybe_await, maybe_sleep
from ._utils import (
DatasetFixture,
collect_iterate_until_present,
get_random_resource_name,
maybe_await,
maybe_sleep,
)
from apify_client._models import Dataset, DatasetListItem, DatasetStatistics, ListOfDatasets
from apify_client._resource_clients.dataset import DatasetItemsPage
from apify_client.errors import ApifyApiError
Expand Down Expand Up @@ -432,19 +438,12 @@ async def test_dataset_collection_iterate(client: ApifyClient | ApifyClientAsync
created_ids.append(dataset.id)

try:
iterator = client.datasets().iterate(desc=True)
collected: list[DatasetListItem] = []
if is_async:
assert isinstance(iterator, AsyncIterator)
async for ds in iterator:
assert isinstance(ds, DatasetListItem)
collected.append(ds)
else:
assert isinstance(iterator, Iterator)
for ds in iterator:
assert isinstance(ds, DatasetListItem)
collected.append(ds)

collected = await collect_iterate_until_present(
lambda: client.datasets().iterate(desc=True),
set(created_ids),
item_type=DatasetListItem,
is_async=is_async,
)
collected_ids = {ds.id for ds in collected}
for created_id in created_ids:
assert created_id in collected_ids
Expand Down
27 changes: 13 additions & 14 deletions tests/integration/test_key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import impit
import pytest

from ._utils import KvsFixture, get_random_resource_name, maybe_await, maybe_sleep
from ._utils import (
KvsFixture,
collect_iterate_until_present,
get_random_resource_name,
maybe_await,
maybe_sleep,
)
from apify_client._models import KeyValueStore, KeyValueStoreKey, ListOfKeys, ListOfKeyValueStores
from apify_client.errors import ApifyApiError

Expand Down Expand Up @@ -555,19 +561,12 @@ async def test_key_value_store_collection_iterate(client: ApifyClient | ApifyCli
created_ids.append(kvs.id)

try:
iterator = client.key_value_stores().iterate(desc=True)
collected: list[KeyValueStore] = []
if is_async:
assert isinstance(iterator, AsyncIterator)
async for kvs in iterator:
assert isinstance(kvs, KeyValueStore)
collected.append(kvs)
else:
assert isinstance(iterator, Iterator)
for kvs in iterator:
assert isinstance(kvs, KeyValueStore)
collected.append(kvs)

collected = await collect_iterate_until_present(
lambda: client.key_value_stores().iterate(desc=True),
set(created_ids),
item_type=KeyValueStore,
is_async=is_async,
)
collected_ids = {kvs.id for kvs in collected}
for created_id in created_ids:
assert created_id in collected_ids
Expand Down
27 changes: 13 additions & 14 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
from datetime import timedelta
from typing import TYPE_CHECKING

from ._utils import get_random_resource_name, get_random_string, maybe_await, maybe_sleep
from ._utils import (
collect_iterate_until_present,
get_random_resource_name,
get_random_string,
maybe_await,
maybe_sleep,
)
from apify_client._models import (
BatchAddResult,
BatchDeleteResult,
Expand Down Expand Up @@ -615,19 +621,12 @@ async def test_request_queue_collection_iterate(client: ApifyClient | ApifyClien
created_ids.append(rq.id)

try:
iterator = client.request_queues().iterate(desc=True)
collected: list[RequestQueueShort] = []
if is_async:
assert isinstance(iterator, AsyncIterator)
async for rq in iterator:
assert isinstance(rq, RequestQueueShort)
collected.append(rq)
else:
assert isinstance(iterator, Iterator)
for rq in iterator:
assert isinstance(rq, RequestQueueShort)
collected.append(rq)

collected = await collect_iterate_until_present(
lambda: client.request_queues().iterate(desc=True),
set(created_ids),
item_type=RequestQueueShort,
is_async=is_async,
)
collected_ids = {rq.id for rq in collected}
for rq_id in created_ids:
assert rq_id in collected_ids
Expand Down
22 changes: 7 additions & 15 deletions tests/integration/test_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

from __future__ import annotations

from collections.abc import AsyncIterator, Iterator
from typing import TYPE_CHECKING

from ._utils import get_random_resource_name, maybe_await
from ._utils import collect_iterate_until_present, get_random_resource_name, maybe_await
from apify_client._models import Actor, ListOfSchedules, Schedule, ScheduleActionRunActor, ScheduleShort

if TYPE_CHECKING:
Expand Down Expand Up @@ -193,19 +192,12 @@ async def test_schedule_collection_iterate(client: ApifyClient | ApifyClientAsyn
created_ids.append(schedule.id)

try:
iterator = client.schedules().iterate()
collected: list[ScheduleShort] = []
if is_async:
assert isinstance(iterator, AsyncIterator)
async for s in iterator:
assert isinstance(s, ScheduleShort)
collected.append(s)
else:
assert isinstance(iterator, Iterator)
for s in iterator:
assert isinstance(s, ScheduleShort)
collected.append(s)

collected = await collect_iterate_until_present(
lambda: client.schedules().iterate(),
set(created_ids),
item_type=ScheduleShort,
is_async=is_async,
)
collected_ids = {s.id for s in collected}
for sched_id in created_ids:
assert sched_id in collected_ids
Expand Down
21 changes: 7 additions & 14 deletions tests/integration/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import timedelta
from typing import TYPE_CHECKING

from ._utils import get_random_resource_name, maybe_await
from ._utils import collect_iterate_until_present, get_random_resource_name, maybe_await
from apify_client._models import Actor, ListOfRuns, ListOfTasks, ListOfWebhooks, Run, RunShort, Task, TaskShort

if TYPE_CHECKING:
Expand Down Expand Up @@ -365,19 +365,12 @@ async def test_task_collection_iterate(client: ApifyClient | ApifyClientAsync, *
created_ids.append(task.id)

try:
iterator = client.tasks().iterate(desc=True)
collected: list[TaskShort] = []
if is_async:
assert isinstance(iterator, AsyncIterator)
async for t in iterator:
assert isinstance(t, TaskShort)
collected.append(t)
else:
assert isinstance(iterator, Iterator)
for t in iterator:
assert isinstance(t, TaskShort)
collected.append(t)

collected = await collect_iterate_until_present(
lambda: client.tasks().iterate(desc=True),
set(created_ids),
item_type=TaskShort,
is_async=is_async,
)
collected_ids = {t.id for t in collected}
for task_id in created_ids:
assert task_id in collected_ids
Expand Down
21 changes: 7 additions & 14 deletions tests/integration/test_webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from apify_client import ApifyClient, ApifyClientAsync


from ._utils import maybe_await
from ._utils import collect_iterate_until_present, maybe_await
from apify_client._models import (
ListOfRuns,
ListOfWebhookDispatches,
Expand Down Expand Up @@ -219,19 +219,12 @@ async def test_webhook_collection_iterate(client: ApifyClient | ApifyClientAsync
assert len(set(created_ids)) == 3

try:
iterator = client.webhooks().iterate(desc=True)
collected: list[WebhookShort] = []
if is_async:
assert isinstance(iterator, AsyncIterator)
async for w in iterator:
assert isinstance(w, WebhookShort)
collected.append(w)
else:
assert isinstance(iterator, Iterator)
for w in iterator:
assert isinstance(w, WebhookShort)
collected.append(w)

collected = await collect_iterate_until_present(
lambda: client.webhooks().iterate(desc=True),
set(created_ids),
item_type=WebhookShort,
is_async=is_async,
)
collected_ids = {w.id for w in collected}
for webhook_id in created_ids:
assert webhook_id in collected_ids
Expand Down
Loading