Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from abc import ABC
import asyncio
from contextlib import contextmanager
import multiprocessing
import os
import typing as _t
from unittest.mock import patch

Expand All @@ -21,6 +21,16 @@
from plugboard.utils.settings import Settings


@contextmanager
def override_settings(settings: Settings) -> _t.Iterator[None]:
"""Temporarily override DI settings for a test and always reset the override."""
DI.settings.override_sync(settings)
try:
yield
finally:
DI.settings.reset_override_sync()


@pytest.hookimpl(optionalhook=True)
def pytest_asyncio_loop_factories() -> dict[str, _t.Callable[[], asyncio.AbstractEventLoop]]:
"""Configure pytest-asyncio to create event loops with uvloop."""
Expand Down Expand Up @@ -72,16 +82,11 @@ async def DI_teardown() -> _t.AsyncGenerator[None, None]:
def zmq_connector_cls(zmq_pubsub_proxy: bool) -> _t.Iterator[_t.Type[ZMQConnector]]:
"""Returns the ZMQConnector class with the specified proxy setting.

Patches the env var `PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY` to control the proxy setting.
Overrides settings to control the proxy setting without mutating process env.
"""
with patch.dict(
os.environ,
{"PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY": str(zmq_pubsub_proxy)},
):
testing_settings = Settings()
DI.settings.override_sync(testing_settings)
testing_settings = Settings.model_validate({"flags": {"zmq_pubsub_proxy": zmq_pubsub_proxy}})
with override_settings(testing_settings):
yield ZMQConnector
DI.settings.reset_override_sync()


class ComponentTestHelper(Component, ABC):
Expand Down
12 changes: 4 additions & 8 deletions tests/integration/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from plugboard.connector.redis_channel import RedisConnector
from plugboard.utils import DI
from plugboard.utils.settings import Settings
from tests.conftest import override_settings
from tests.unit.test_channel import ( # noqa: F401
TEST_ITEMS,
test_channel,
Expand All @@ -28,16 +29,11 @@
def zmq_connector_cls(zmq_pubsub_proxy: bool) -> _t.Iterator[_t.Type[ZMQConnector]]:
"""Returns the ZMQConnector class with the specified proxy setting.

Patches the env var `PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY` to control the proxy setting.
Overrides settings to control the proxy setting without mutating process env.
"""
with patch.dict(
os.environ,
{"PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY": str(zmq_pubsub_proxy)},
):
testing_settings = Settings()
DI.settings.override_sync(testing_settings)
testing_settings = Settings.model_validate({"flags": {"zmq_pubsub_proxy": zmq_pubsub_proxy}})
with override_settings(testing_settings):
yield ZMQConnector
DI.settings.reset_override_sync()


@pytest_cases.fixture
Expand Down
15 changes: 4 additions & 11 deletions tests/integration/test_connector_pubsub.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Integration tests for pubsub mode connector against broker/messaging infrastructure."""

import os
import typing as _t
from unittest.mock import patch

import pytest
import pytest_cases
Expand All @@ -13,8 +11,8 @@
ZMQConnector,
)
from plugboard.connector.redis_channel import RedisConnector
from plugboard.utils.di import DI
from plugboard.utils.settings import Settings
from tests.conftest import override_settings
from tests.unit.test_connector_pubsub import ( # noqa: F401
_HASH_SEED,
TEST_ITEMS,
Expand All @@ -29,16 +27,11 @@
def zmq_connector_cls(zmq_pubsub_proxy: bool) -> _t.Iterator[_t.Type[ZMQConnector]]:
"""Returns the ZMQConnector class with the specified proxy setting.

Patches the env var `PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY` to control the proxy setting.
Overrides settings to control the proxy setting without mutating process env.
"""
with patch.dict(
os.environ,
{"PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY": str(zmq_pubsub_proxy)},
):
testing_settings = Settings()
DI.settings.override_sync(testing_settings)
testing_settings = Settings.model_validate({"flags": {"zmq_pubsub_proxy": zmq_pubsub_proxy}})
with override_settings(testing_settings):
yield ZMQConnector
DI.settings.reset_override_sync()


@pytest_cases.fixture
Expand Down
33 changes: 32 additions & 1 deletion tests/smoke/test_examples_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@


SMOKE_TEST_TIMEOUT = 90
MAX_TRANSIENT_NETWORK_RETRIES = 1
PROJECT_ROOT = Path(__file__).parent.parent.parent
TRANSIENT_NETWORK_ERROR_SIGNATURES = (
"ConnectError",
"ReadTimeout",
"RemoteProtocolError",
"Server disconnected without sending a response",
)


@pytest.fixture(scope="module", autouse=True)
Expand Down Expand Up @@ -51,7 +58,7 @@ def test_tutorial_file_runs(file_and_dir: Tuple[Path, Path]) -> None:
"""Test that a tutorial file runs without errors."""
py_file, working_dir = file_and_dir

try:
def _run_tutorial() -> tuple[subprocess.Popen[str], str, str]:
process = subprocess.Popen( # noqa: S603
[sys.executable, py_file.name],
cwd=working_dir,
Expand All @@ -67,8 +74,32 @@ def test_tutorial_file_runs(file_and_dir: Tuple[Path, Path]) -> None:
pytest.skip(
f"{py_file.relative_to(PROJECT_ROOT)} timed out after {SMOKE_TEST_TIMEOUT} seconds"
)
return process, stdout, stderr

def _has_transient_network_error(*outputs: str) -> bool:
for output in outputs:
for signature in TRANSIENT_NETWORK_ERROR_SIGNATURES:
if signature in output:
return True
return False

try:
process, stdout, stderr = _run_tutorial()
retries = 0
while (
process.returncode != 0
and retries < MAX_TRANSIENT_NETWORK_RETRIES
and _has_transient_network_error(stdout, stderr)
):
retries += 1
process, stdout, stderr = _run_tutorial()

if process.returncode != 0:
if _has_transient_network_error(stdout, stderr):
pytest.skip(
f"{py_file.relative_to(PROJECT_ROOT)} failed due to a transient external "
"network error"
)
error_msg = (
f"Tutorial file {py_file.relative_to(PROJECT_ROOT)} "
f"failed to run successfully.\n"
Expand Down
15 changes: 5 additions & 10 deletions tests/unit/test_channel.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""Unit tests for channels."""

import asyncio
import os
import typing as _t
from unittest.mock import patch

import pytest
import pytest_cases
Expand All @@ -21,6 +19,7 @@
from plugboard.schemas import ConnectorMode, ConnectorSpec
from plugboard.utils.di import DI
from plugboard.utils.settings import Settings
from tests.conftest import override_settings


TEST_ITEMS = [
Expand All @@ -39,16 +38,11 @@
def zmq_connector_cls(zmq_pubsub_proxy: bool) -> _t.Iterator[_t.Type[ZMQConnector]]:
"""Returns the ZMQConnector class with the specified proxy setting.

Patches the env var `PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY` to control the proxy setting.
Overrides settings to control the proxy setting without mutating process env.
"""
with patch.dict(
os.environ,
{"PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY": str(zmq_pubsub_proxy)},
):
testing_settings = Settings()
DI.settings.override_sync(testing_settings)
testing_settings = Settings.model_validate({"flags": {"zmq_pubsub_proxy": zmq_pubsub_proxy}})
with override_settings(testing_settings):
yield ZMQConnector
DI.settings.reset_override_sync()


@pytest_cases.fixture
Expand Down Expand Up @@ -98,6 +92,7 @@ def connector_cls_mp(_connector_cls_mp: type[Connector]) -> type[Connector]:


@pytest.mark.asyncio
@pytest.mark.flaky(reruns=2)
async def test_multiprocessing_channel(
connector_cls_mp: type[Connector], ray_ctx: None, job_id_ctx: str
) -> None:
Expand Down
15 changes: 4 additions & 11 deletions tests/unit/test_connector_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import asyncio
from functools import lru_cache
from itertools import cycle
import os
import random
import string
import time
import typing as _t
from unittest.mock import patch

import pytest
import pytest_cases
Expand All @@ -21,25 +19,20 @@
)
from plugboard.exceptions import ChannelClosedError
from plugboard.schemas import ConnectorMode, ConnectorSpec
from plugboard.utils.di import DI
from plugboard.utils.settings import Settings
from tests.conftest import override_settings


@pytest_cases.fixture
@pytest_cases.parametrize(zmq_pubsub_proxy=[False])
def zmq_connector_cls(zmq_pubsub_proxy: bool) -> _t.Iterator[_t.Type[ZMQConnector]]:
"""Returns the ZMQConnector class with the specified proxy setting.

Patches the env var `PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY` to control the proxy setting.
Overrides settings to control the proxy setting without mutating process env.
"""
with patch.dict(
os.environ,
{"PLUGBOARD_FLAGS_ZMQ_PUBSUB_PROXY": str(zmq_pubsub_proxy)},
):
testing_settings = Settings()
DI.settings.override_sync(testing_settings)
testing_settings = Settings.model_validate({"flags": {"zmq_pubsub_proxy": zmq_pubsub_proxy}})
with override_settings(testing_settings):
yield ZMQConnector
DI.settings.reset_override_sync()


@pytest_cases.fixture
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_state_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def state_backend_cls(request: pytest.FixtureRequest) -> _t.Type[StateBackend]:


@pytest.mark.asyncio
@pytest.mark.flaky(reruns=2)
@pytest.mark.parametrize(
"job_id_fixture, metadata, exc_ctx",
[
Expand Down Expand Up @@ -85,6 +86,7 @@ async def test_state_backend_init(


@pytest.mark.asyncio
@pytest.mark.flaky(reruns=2)
async def test_state_backend_init_with_existing_job(
datetime_now: str,
state_backend_cls: _t.Type[StateBackend],
Expand Down
Loading