diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8261bd95..8eaad8b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: with: test_service_port: 9000 token: ${{ secrets.GITHUB_TOKEN }} - version: v3.0.0-alpha.4 + version: v3.0.0-alpha.6 enable_persistence_tests: "true" windows: diff --git a/contract-tests/client_entity.py b/contract-tests/client_entity.py index ae1b9eea..e719d845 100644 --- a/contract-tests/client_entity.py +++ b/contract-tests/client_entity.py @@ -62,7 +62,6 @@ def __init__(self, tag, config): sync_configs = datasystem_config.get('synchronizers') if sync_configs is not None: sync_builders = [] - fallback_builder = None for sync_config in sync_configs: streaming = sync_config.get('streaming') @@ -79,14 +78,18 @@ def __init__(self, tag, config): _set_optional_time(polling, "pollIntervalMs", builder.poll_interval) sync_builders.append(builder) - fallback_builder = fdv1_fallback_ds_builder() - _set_optional_value(polling, "baseUri", fallback_builder.base_uri) - _set_optional_time(polling, "pollIntervalMs", fallback_builder.poll_interval) - if sync_builders: datasystem.synchronizers(*sync_builders) - if fallback_builder is not None: - datasystem.fdv1_compatible_synchronizer(fallback_builder) + + # The FDv1 Fallback Synchronizer is engaged only in response to a + # server-directed FDv1 Fallback Directive; it is configured separately + # from the FDv2 Primary/Fallback synchronizer chain. + fdv1_fallback_config = datasystem_config.get('fdv1Fallback') + if fdv1_fallback_config is not None: + fallback_builder = fdv1_fallback_ds_builder() + _set_optional_value(fdv1_fallback_config, "baseUri", fallback_builder.base_uri) + _set_optional_time(fdv1_fallback_config, "pollIntervalMs", fallback_builder.poll_interval) + datasystem.fdv1_compatible_synchronizer(fallback_builder) if datasystem_config.get("payloadFilter") is not None: opts["payload_filter_key"] = datasystem_config["payloadFilter"] diff --git a/contract-tests/service.py b/contract-tests/service.py index 7b023bcf..342c3870 100644 --- a/contract-tests/service.py +++ b/contract-tests/service.py @@ -84,6 +84,7 @@ def status(): 'persistent-data-store-consul', 'flag-change-listeners', 'flag-value-change-listeners', + 'fdv1-fallback', ] } return json.dumps(body), 200, {'Content-type': 'application/json'} diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index ff9edecd..326a3e2b 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -139,7 +139,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: yield Update( state=DataSourceState.OFF, error=error_info, - revert_to_fdv1=True, + fallback_to_fdv1=True, environment_id=envid, ) break @@ -168,6 +168,17 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: message=result.error, ) + # Even a non-HTTP error (e.g. malformed JSON) can carry the fallback + # header. If so, halt rather than retrying the FDv2 endpoint. + if fallback: + yield Update( + state=DataSourceState.OFF, + error=error_info, + fallback_to_fdv1=True, + environment_id=envid, + ) + break + yield Update( state=DataSourceState.INTERRUPTED, error=error_info, @@ -179,7 +190,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: state=DataSourceState.VALID, change_set=change_set, environment_id=headers.get(_LD_ENVID_HEADER), - revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true' + fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true' ) if self._interrupt_event.wait(self._poll_interval): @@ -204,13 +215,18 @@ def _poll(self, ss: SelectorStore) -> BasisResult: if is_http_error_recoverable(status_code): log.warning(http_error_message_result) + # Forward any response headers so callers (e.g. FDv2 datasystem) + # can read the X-LD-FD-Fallback directive even on error. return _Fail( - error=http_error_message_result, exception=result.exception + error=http_error_message_result, + exception=result.exception, + headers=result.headers, ) return _Fail( error=result.error or "Failed to request payload", exception=result.exception, + headers=result.headers, ) (change_set, headers) = result.value @@ -223,6 +239,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult: change_set=change_set, persist=change_set.selector.is_defined(), environment_id=env_id, + fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true', ) return _Success(value=basis) diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 065f7917..8180c901 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -8,6 +8,7 @@ from typing import Callable, Generator, Optional, Tuple from urllib import parse +import urllib3 from ld_eventsource import SSEClient from ld_eventsource.actions import Event, Fault, Start from ld_eventsource.config import ( @@ -57,7 +58,10 @@ STREAMING_ENDPOINT = "/sdk/stream" -SseClientBuilder = Callable[[str, HTTPConfig, float, Config, SelectorStore], SSEClient] +SseClientBuilder = Callable[ + [str, HTTPConfig, float, Config, SelectorStore], + Tuple[SSEClient, Optional[urllib3.PoolManager]], +] def create_sse_client( @@ -66,10 +70,17 @@ def create_sse_client( initial_reconnect_delay: float, config: Config, ss: SelectorStore -) -> SSEClient: +) -> Tuple[SSEClient, Optional[urllib3.PoolManager]]: """ " create_sse_client creates an SSEClient instance configured to connect - to the LaunchDarkly streaming endpoint. + to the LaunchDarkly streaming endpoint, along with the urllib3 PoolManager + backing it. The pool is returned alongside the client so the caller can + force-close any pooled connections on shutdown -- ``SSEClient.close()`` + only releases the connection back to the pool via ``urllib3.HTTPResponse + .shutdown()`` (which performs a half-close on the local read side) plus + ``release_conn()``, neither of which actually closes the underlying TCP + socket on Python 3.10. Closing the pool ensures the server observes the + client's disconnect when the FDv1 Fallback Directive engages. """ uri = base_uri + STREAMING_ENDPOINT if config.payload_filter_key is not None: @@ -87,11 +98,12 @@ def query_params() -> dict[str, str]: selector = ss.selector() return {"basis": selector.state} if selector.is_defined() else {} - return SSEClient( + pool = stream_http_factory.create_pool_manager(1, uri) + sse_client = SSEClient( connect=ConnectStrategy.http( url=uri, headers=base_headers, - pool=stream_http_factory.create_pool_manager(1, uri), + pool=pool, urllib3_request_options={"timeout": stream_http_factory.timeout}, query_params=query_params ), @@ -106,6 +118,31 @@ def query_params() -> dict[str, str]: retry_delay_reset_threshold=BACKOFF_RESET_INTERVAL, logger=log, ) + return sse_client, pool + + +def _close_pool_manager(pool: Optional[urllib3.PoolManager]) -> None: + """Close every pooled connection in ``pool`` so the underlying TCP sockets + are torn down. ``HTTPConnectionPool.close()`` drains its queue and calls + ``conn.close()`` on each connection, which sends the FIN that the server + is waiting on. ``PoolManager.clear()`` alone doesn't do this -- it just + drops the dict of pools without closing the connections inside them.""" + if pool is None: + return + try: + # ``RecentlyUsedContainer`` deliberately disallows iteration; ``keys()`` + # returns a thread-safe snapshot. We look each one up to close its + # underlying ``HTTPConnectionPool``. + for key in list(pool.pools.keys()): + try: + connection_pool = pool.pools.get(key) + if connection_pool is not None: + connection_pool.close() + except Exception: # pylint: disable=broad-except + log.debug("Error closing streaming connection pool", exc_info=True) + pool.clear() + except Exception: # pylint: disable=broad-except + log.debug("Error closing streaming pool manager", exc_info=True) class StreamingDataSource(Synchronizer, DiagnosticSource): @@ -129,6 +166,7 @@ def __init__(self, self._sse_client_builder = create_sse_client self._config = config self._sse: Optional[SSEClient] = None + self._sse_pool: Optional[urllib3.PoolManager] = None self._running = False self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None self._connection_attempt_start_time: Optional[float] = None @@ -149,13 +187,19 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: Update objects until the connection is closed or an unrecoverable error occurs. """ - self._sse = self._sse_client_builder( + builder_result = self._sse_client_builder( self.__uri, self.__http_options, self.__initial_reconnect_delay, self._config, ss ) + # Tests may inject a builder that returns either an SSEClient directly + # or a (client, pool) tuple. Accept both. + if isinstance(builder_result, tuple): + self._sse, self._sse_pool = builder_result + else: + self._sse, self._sse_pool = builder_result, None if self._sse is None: log.error("Failed to create SSE client for streaming updates.") @@ -166,6 +210,11 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: self._connection_attempt_start_time = time() envid = None + # fallback_requested is set when a Start action carries + # X-LD-FD-Fallback: true. We finish applying the current payload + # before halting, so consumers can serve the server-provided data + # while FDv1 takes over. + fallback_requested = False for action in self._sse.all: if isinstance(action, Fault): # If the SSE client detects the stream has closed, then it will @@ -186,17 +235,9 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: continue if isinstance(action, Start) and action.headers is not None: - fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true' envid = action.headers.get(_LD_ENVID_HEADER, envid) - - if fallback: - self._record_stream_init(True) - yield Update( - state=DataSourceState.OFF, - revert_to_fdv1=True, - environment_id=envid, - ) - break + if action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true': + fallback_requested = True if not isinstance(action, Event): continue @@ -206,6 +247,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: if update is not None: self._record_stream_init(False) self._connection_attempt_start_time = None + if fallback_requested: + # Decorate the completed update with the fallback signal, + # then halt — the consumer will switch to FDv1. + update = Update( + state=update.state, + change_set=update.change_set, + error=update.error, + fallback_to_fdv1=True, + environment_id=update.environment_id, + ) + yield update + break yield update except json.decoder.JSONDecodeError as e: log.info( @@ -229,11 +282,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: error=DataSourceErrorInfo( DataSourceErrorKind.UNKNOWN, 0, time(), str(e) ), - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) self._sse.close() + # Force-close the underlying urllib3 pool. SSEClient.close() only does a + # half-close on the local read side and releases the connection back to + # the pool, which on Python 3.10 leaves the TCP socket open until the + # response object is garbage-collected. The FDv1 Fallback Directive + # requires the Primary Synchronizer to be terminated promptly, so we + # tear down the pool here to send the FIN the server is waiting on. + _close_pool_manager(self._sse_pool) def stop(self): """ @@ -243,6 +303,10 @@ def stop(self): self._running = False if self._sse: self._sse.close() + # See _close_pool_manager docstring: this is what actually severs the + # TCP connection. ``stop()`` may be called from a different thread than + # the one running ``sync()``; close() is idempotent on the pool. + _close_pool_manager(self._sse_pool) def _record_stream_init(self, failed: bool): if self._diagnostic_accumulator and self._connection_attempt_start_time: @@ -353,7 +417,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona error=DataSourceErrorInfo( DataSourceErrorKind.INVALID_DATA, 0, time(), str(error) ), - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) return (update, True) @@ -377,7 +441,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona update = Update( state=DataSourceState.OFF, error=error_info, - revert_to_fdv1=True, + fallback_to_fdv1=True, environment_id=envid, ) self.stop() @@ -394,7 +458,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona else DataSourceState.OFF ), error=error_info, - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) @@ -416,7 +480,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona error=DataSourceErrorInfo( DataSourceErrorKind.UNKNOWN, 0, time(), str(error) ), - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) # no stacktrace here because, for a typical connection error, it'll diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 97f207f3..682167fd 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -18,9 +18,10 @@ from ldclient.impl.listeners import Listeners from ldclient.impl.repeating_task import RepeatingTask from ldclient.impl.rwlock import ReadWriteLock -from ldclient.impl.util import _Fail, log +from ldclient.impl.util import _LD_FD_FALLBACK_HEADER, _Fail, log from ldclient.interfaces import ( DataSourceErrorInfo, + DataSourceErrorKind, DataSourceState, DataSourceStatus, DataSourceStatusProvider, @@ -276,7 +277,7 @@ class ConditionDirective(str, Enum): FDV1 = "fdv1" """ - FDV1 suggests that we should immediately revert to the FDv1 fallback synchronizer. + FDV1 suggests that we should immediately fall back to the FDv1 Fallback Synchronizer. """ @@ -403,7 +404,26 @@ def _run_main_loop(self, set_on_ready: Event): ) # Run initializers first - self._run_initializers(set_on_ready) + fallback_requested = self._run_initializers(set_on_ready) + + # If an initializer asked the SDK to fall back to FDv1, halt the + # configured FDv2 chain and switch terminally to the FDv1 Fallback + # Synchronizer (or transition to OFF if none is configured). + if fallback_requested: + if self._fdv1_fallback_synchronizer_builder is not None: + log.warning("Falling back to FDv1 protocol") + self._synchronizers = [self._fdv1_fallback_synchronizer_builder] + else: + log.warning( + "Initializer requested FDv1 fallback but none configured" + ) + self._synchronizers = [] + self._data_source_status_provider.update_status( + DataSourceState.OFF, + self._data_source_status_provider.status.error, + ) + set_on_ready.set() + return # Run synchronizers self._run_synchronizers(set_on_ready) @@ -414,14 +434,22 @@ def _run_main_loop(self, set_on_ready: Event): if not set_on_ready.is_set(): set_on_ready.set() - def _run_initializers(self, set_on_ready: Event): - """Run initializers to get initial data.""" + def _run_initializers(self, set_on_ready: Event) -> bool: + """ + Run initializers to get initial data. + + Returns True when an initializer requested the FDv1 Fallback Directive + (via the X-LD-FD-Fallback response header). When that happens, any + accompanying payload is applied first so evaluations can serve the + server-provided data while the FDv1 synchronizer spins up; the caller + is then responsible for switching to the FDv1 Fallback Synchronizer. + """ if self._data_system_config.initializers is None: - return + return False for initializer_builder in self._data_system_config.initializers: if self._stop_event.is_set(): - return + return False try: initializer = initializer_builder.build(self._config) @@ -431,6 +459,25 @@ def _run_initializers(self, set_on_ready: Event): if isinstance(basis_result, _Fail): log.warning("Initializer %s failed: %s", initializer.name, basis_result.error) + # An error response can still carry the FDv1 fallback directive. + if basis_result.headers is not None and \ + basis_result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true': + log.warning( + "Initializer %s requested fallback to FDv1 protocol", + initializer.name, + ) + # Surface the underlying error on the status so + # programmatic monitors can see why FDv2 shut down. + self._data_source_status_provider.update_status( + DataSourceState.INITIALIZING, + DataSourceErrorInfo( + kind=DataSourceErrorKind.UNKNOWN, + status_code=0, + time=time.time(), + message=basis_result.error, + ), + ) + return True continue basis = basis_result.value @@ -442,9 +489,19 @@ def _run_initializers(self, set_on_ready: Event): # Set ready event if an only if a selector is defined for the changeset if basis.change_set.selector.is_defined(): set_on_ready.set() - return + + if basis.fallback_to_fdv1: + log.warning( + "Initializer %s requested fallback to FDv1 protocol", + initializer.name, + ) + return True + + if basis.change_set.selector.is_defined(): + return False except Exception as e: log.error("Initializer failed with exception: %s", e) + return False def _run_synchronizers(self, set_on_ready: Event): """Run synchronizers to keep data up-to-date.""" @@ -476,12 +533,12 @@ def synchronizer_loop(self: 'FDv2'): if directive == ConditionDirective.FDV1: # Abandon all synchronizers and use only fdv1 fallback - log.info("Reverting to FDv1 fallback synchronizer") + log.warning("Falling back to FDv1 protocol") if self._fdv1_fallback_synchronizer_builder is not None: synchronizers_list = [self._fdv1_fallback_synchronizer_builder] current_index = 0 else: - log.warning("No FDv1 fallback synchronizer available") + log.warning("Synchronizer requested FDv1 fallback but none configured") synchronizers_list = [] self._data_source_status_provider.update_status( DataSourceState.OFF, @@ -608,8 +665,11 @@ def reader(self: 'FDv2'): # Update status self._data_source_status_provider.update_status(update.state, update.error) - # Check if we should revert to FDv1 immediately - if update.revert_to_fdv1: + # Check if we should fall back to FDv1 immediately. fallback_to_fdv1 + # may ride along on a Valid update (payload + directive in the same + # response), in which case the ChangeSet has already been applied + # above before we hand off. + if update.fallback_to_fdv1: return ConditionDirective.FDV1 # Check for OFF state indicating permanent failure diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 62e20527..15d0832c 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -1366,6 +1366,13 @@ class Basis: change_set: ChangeSet persist: bool environment_id: Optional[str] = None + fallback_to_fdv1: bool = False + """ + Indicates that the LaunchDarkly server has directed the SDK to fall + back to the FDv1 Fallback Synchronizer (via the X-LD-FD-Fallback + response header). When True, callers must apply ``change_set`` first + and then terminally switch to the FDv1 Fallback Synchronizer. + """ class ChangeSetBuilder: @@ -1647,7 +1654,14 @@ class Update: state: DataSourceState change_set: Optional[ChangeSet] = None error: Optional[DataSourceErrorInfo] = None - revert_to_fdv1: bool = False + fallback_to_fdv1: bool = False + """ + Indicates that the LaunchDarkly server has directed the SDK to fall + back to the FDv1 Fallback Synchronizer (via the X-LD-FD-Fallback + response header). When True, callers must apply any accompanying + ``change_set`` first and then terminally switch to the FDv1 Fallback + Synchronizer. The flag may ride along on Valid or Off updates. + """ environment_id: Optional[str] = None diff --git a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py index 831844d7..d59cc7df 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py @@ -10,7 +10,12 @@ fdv1_polling_payload_to_changeset, polling_payload_to_changeset ) -from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success +from ldclient.impl.util import ( + _LD_FD_FALLBACK_HEADER, + UnsuccessfulResponseException, + _Fail, + _Success +) from ldclient.interfaces import ChangeSetBuilder, IntentCode from ldclient.testing.mock_components import MockSelectorStore @@ -141,6 +146,58 @@ def test_handles_transfer_changes(): assert result.value.persist is True +def test_initializer_carries_fallback_signal_on_successful_response(): + """A 200 OK with X-LD-FD-Fallback: true must produce a Basis with + fallback_to_fdv1=True so the caller can apply the payload first and then + switch terminally to the FDv1 Fallback Synchronizer.""" + change_set = ChangeSetBuilder.no_changes() + headers = {_LD_FD_FALLBACK_HEADER: 'true'} + mock_requester = MockPollingRequester(_Success(value=(change_set, headers))) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch(MockSelectorStore(Selector.no_selector())) + + assert isinstance(result, _Success) + assert result.value is not None + assert result.value.fallback_to_fdv1 is True + + +def test_initializer_propagates_fallback_signal_on_error_response(): + """An error response carrying X-LD-FD-Fallback: true must propagate the + response headers in the _Fail so the FDv2 datasystem can detect the + directive even when no payload was delivered.""" + headers = {_LD_FD_FALLBACK_HEADER: 'true'} + mock_requester = MockPollingRequester( + _Fail( + error="failure message", + exception=UnsuccessfulResponseException(500), + headers=headers, + ) + ) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch(MockSelectorStore(Selector.no_selector())) + + assert isinstance(result, _Fail) + assert result.headers is not None + assert result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true' + + +def test_initializer_basis_default_fallback_is_false(): + """A response without the fallback header must produce a Basis with + fallback_to_fdv1=False.""" + change_set = ChangeSetBuilder.no_changes() + headers: dict = {} + mock_requester = MockPollingRequester(_Success(value=(change_set, headers))) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch(MockSelectorStore(Selector.no_selector())) + + assert isinstance(result, _Success) + assert result.value is not None + assert result.value.fallback_to_fdv1 is False + + def test_handles_fdv1_payload(): """Test that FDv1 payloads have persist set to False.""" fdv1_data = { diff --git a/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py index 7bc1d79d..8a7d751a 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py @@ -116,7 +116,7 @@ def test_handles_no_changes(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None assert valid.change_set.intent_code == IntentCode.TRANSFER_NONE @@ -137,7 +137,7 @@ def test_handles_empty_changeset(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -165,7 +165,7 @@ def test_handles_put_objects(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -196,7 +196,7 @@ def test_handles_delete_objects(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -234,7 +234,7 @@ def test_generic_error_interrupts_and_recovers(): assert interrupted.error.kind == DataSourceErrorKind.NETWORK_ERROR assert interrupted.error.status_code == 0 assert interrupted.error.message == "error for test" - assert interrupted.revert_to_fdv1 is False + assert interrupted.fallback_to_fdv1 is False assert interrupted.environment_id is None assert valid.change_set is not None @@ -267,12 +267,12 @@ def test_recoverable_error_continues(): assert interrupted.error is not None assert interrupted.error.kind == DataSourceErrorKind.ERROR_RESPONSE assert interrupted.error.status_code == 408 - assert interrupted.revert_to_fdv1 is False + assert interrupted.fallback_to_fdv1 is False assert interrupted.environment_id is None assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -303,7 +303,7 @@ def test_unrecoverable_error_shuts_down(): assert off.error is not None assert off.error.kind == DataSourceErrorKind.ERROR_RESPONSE assert off.error.status_code == 401 - assert off.revert_to_fdv1 is False + assert off.fallback_to_fdv1 is False assert off.environment_id is None assert off.change_set is None @@ -328,7 +328,7 @@ def test_envid_from_success_headers(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id == 'test-env-polling-123' @@ -370,7 +370,7 @@ def test_envid_from_fallback_headers(): valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) assert valid.state == DataSourceState.VALID - assert valid.revert_to_fdv1 is True + assert valid.fallback_to_fdv1 is True assert valid.environment_id == 'test-env-fallback' @@ -449,7 +449,7 @@ def test_envid_from_error_with_fallback(): off = next(sync) assert off.state == DataSourceState.OFF - assert off.revert_to_fdv1 is True + assert off.fallback_to_fdv1 is True assert off.environment_id == 'test-env-503' @@ -478,3 +478,27 @@ def test_envid_from_generic_error_with_headers(): assert interrupted.error.kind == DataSourceErrorKind.NETWORK_ERROR assert valid.state == DataSourceState.VALID + + +def test_synchronizer_generic_error_with_fallback_header_halts(): + """A non-HTTP error (e.g. malformed JSON) carrying X-LD-FD-Fallback: true + must halt the synchronizer and emit OFF + fallback_to_fdv1=True rather + than retrying the FDv2 endpoint.""" + headers_error = { + _LD_ENVID_HEADER: 'test-env-generic', + _LD_FD_FALLBACK_HEADER: 'true', + } + _failure = _Fail(error="malformed body", headers=headers_error) + + synchronizer = PollingDataSource( + poll_interval=0.01, + requester=ListBasedRequester(results=iter([_failure])), + ) + updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.OFF + assert updates[0].fallback_to_fdv1 is True + assert updates[0].environment_id == 'test-env-generic' + assert updates[0].error is not None + assert updates[0].error.kind == DataSourceErrorKind.NETWORK_ERROR diff --git a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py index 559e2c24..62fb0964 100644 --- a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py @@ -202,7 +202,7 @@ def test_handles_no_changes(): assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is None @@ -222,7 +222,7 @@ def test_handles_empty_changeset(events): # pylint: disable=redefined-outer-nam assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -249,7 +249,7 @@ def test_handles_put_objects(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -281,7 +281,7 @@ def test_handles_delete_objects(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -312,7 +312,7 @@ def test_swallows_goodbye(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -339,7 +339,7 @@ def test_swallows_heartbeat(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -368,7 +368,7 @@ def test_error_resets(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -392,7 +392,7 @@ def test_handles_out_of_order(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.INTERRUPTED assert updates[0].change_set is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].error is not None @@ -423,7 +423,7 @@ def test_invalid_json_decoding(events): # pylint: disable=redefined-outer-name assert len(updates) == 2 assert updates[0].state == DataSourceState.INTERRUPTED assert updates[0].change_set is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].error is not None @@ -458,7 +458,7 @@ def test_stops_on_unrecoverable_status_code( assert len(updates) == 1 assert updates[0].state == DataSourceState.OFF assert updates[0].change_set is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].error is not None @@ -583,8 +583,10 @@ def test_envid_preserved_across_events(events): # pylint: disable=redefined-out assert len(updates[0].change_set.changes) == 1 -def test_envid_from_fallback_header(): - """Test that environment ID is captured when fallback header is present""" +def test_fallback_header_with_no_payload_emits_no_update(): + """A Start carrying X-LD-FD-Fallback with no following payload events + must not synthesize an Update. The directive only fires once a payload + has been applied or an error has been observed.""" start_action = Start(headers={_LD_ENVID_HEADER: 'test-env-fallback', _LD_FD_FALLBACK_HEADER: 'true'}) builder = list_sse_client([start_action]) @@ -593,10 +595,94 @@ def test_envid_from_fallback_header(): synchronizer._sse_client_builder = builder updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + assert updates == [] + + +def test_fallback_header_with_payload_emits_valid_with_fallback(events): # pylint: disable=redefined-outer-name + """When the response carries X-LD-FD-Fallback: true and a valid SSE + payload, the synchronizer must apply the payload and then emit a single + Valid update with fallback_to_fdv1=True so the consumer can hand off to + the FDv1 Fallback Synchronizer.""" + start_action = Start(headers={_LD_ENVID_HEADER: 'test-env-fallback', _LD_FD_FALLBACK_HEADER: 'true'}) + + builder = list_sse_client( + [ + start_action, + events[EventName.SERVER_INTENT], + events[EventName.PUT_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = make_streaming_data_source() + synchronizer._sse_client_builder = builder + updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + assert len(updates) == 1 - assert updates[0].state == DataSourceState.OFF - assert updates[0].revert_to_fdv1 is True + assert updates[0].state == DataSourceState.VALID + assert updates[0].fallback_to_fdv1 is True assert updates[0].environment_id == 'test-env-fallback' + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 1 + + +def test_streaming_closes_underlying_pool_on_fallback(events): # pylint: disable=redefined-outer-name + """When the FDv1 Fallback Directive engages, the underlying urllib3 + connection pool must be torn down so the FDv2 streaming TCP connection + is actually closed. ``SSEClient.close()`` only releases the connection + back to the pool via a half-close; on Python 3.10 that leaves the socket + open until GC, which the spec forbids -- the Primary Synchronizer must + be terminated promptly when the directive fires.""" + pool_close_calls = [] + + class TrackingPool: + """Stand-in PoolManager that records calls to clear() and exposes a + keys()-iterable pools attribute matching urllib3's RecentlyUsedContainer.""" + + def __init__(self): + self.cleared = False + self.connection_pool = TrackingConnectionPool() + self.pools = TrackingPoolDict({"key": self.connection_pool}) + + def clear(self): + self.cleared = True + + class TrackingConnectionPool: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + pool_close_calls.append(self) + + class TrackingPoolDict: + def __init__(self, items): + self._items = items + + def keys(self): + return list(self._items.keys()) + + def get(self, key): + return self._items.get(key) + + tracking_pool = TrackingPool() + + def builder(*_args, **_kwargs): + return ListBasedSseClient([ + Start(headers={_LD_FD_FALLBACK_HEADER: 'true'}), + events[EventName.SERVER_INTENT], + events[EventName.PUT_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ]), tracking_pool + + synchronizer = make_streaming_data_source() + synchronizer._sse_client_builder = builder + updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + + assert len(updates) == 1 + assert updates[0].fallback_to_fdv1 is True + assert tracking_pool.cleared is True + assert tracking_pool.connection_pool.closed is True def test_envid_from_fault_action(): @@ -655,7 +741,7 @@ def test_envid_from_fault_with_fallback(): assert len(updates) == 1 assert updates[0].state == DataSourceState.OFF - assert updates[0].revert_to_fdv1 is True + assert updates[0].fallback_to_fdv1 is True assert updates[0].environment_id == 'test-env-503' diff --git a/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py index d49a82aa..8e885e3a 100644 --- a/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py +++ b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py @@ -11,11 +11,20 @@ from ldclient.datasystem import file_ds_builder from ldclient.impl.datasystem import DataAvailability from ldclient.impl.datasystem.fdv2 import FDv2 +from ldclient.impl.util import _LD_FD_FALLBACK_HEADER, _Fail, _Success from ldclient.integrations.test_datav2 import TestDataV2 from ldclient.interfaces import ( + Basis, + BasisResult, + ChangeSetBuilder, DataSourceState, DataSourceStatus, FlagChange, + Initializer, + IntentCode, + ObjectKind, + Selector, + SelectorStore, Synchronizer, Update ) @@ -32,6 +41,31 @@ def build(self, config: Config) -> Synchronizer: # pylint: disable=unused-argum return self._mock +class _StaticInitializer(Initializer): + """A test initializer that returns a fixed BasisResult.""" + + def __init__(self, name: str, result: BasisResult): + self._name = name + self._result = result + + @property + def name(self) -> str: + return self._name + + def fetch(self, ss: SelectorStore) -> BasisResult: # pylint: disable=unused-argument + return self._result + + +class _InitializerBuilder(DataSourceBuilder): # pylint: disable=too-few-public-methods + """Wraps a static Initializer as a DataSourceBuilder.""" + + def __init__(self, initializer: Initializer): + self._initializer = initializer + + def build(self, config: Config) -> Initializer: # pylint: disable=unused-argument + return self._initializer + + def test_two_phase_init(): td_initializer = TestDataV2.data_source() td_initializer.update(td_initializer.flag("feature-flag").on(True)) @@ -196,11 +230,11 @@ def test_fdv2_falls_back_to_fdv1_on_polling_error_with_header(): mock_primary.name = "mock-primary" mock_primary.stop = Mock() - # Simulate a synchronizer that yields an OFF state with revert_to_fdv1=True + # Simulate a synchronizer that yields an OFF state with fallback_to_fdv1=True mock_primary.sync.return_value = iter([ Update( state=DataSourceState.OFF, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -250,7 +284,7 @@ def test_fdv2_falls_back_to_fdv1_on_polling_success_with_header(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.VALID, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -304,7 +338,7 @@ def test_fdv2_falls_back_to_fdv1_with_initializer(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.OFF, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -353,7 +387,7 @@ def test_fdv2_no_fallback_without_header(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.INTERRUPTED, - revert_to_fdv1=False # No fallback + fallback_to_fdv1=False # No fallback ) ]) @@ -364,7 +398,7 @@ def test_fdv2_no_fallback_without_header(): mock_secondary.sync.return_value = iter([ Update( state=DataSourceState.VALID, - revert_to_fdv1=False + fallback_to_fdv1=False ) ]) @@ -407,7 +441,7 @@ def test_fdv2_stays_on_fdv1_after_fallback(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.OFF, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -548,3 +582,171 @@ def listener(_: FlagChange): fdv2.stop() finally: os.remove(path) + + +def _basis_with_one_flag(flag_key: str, fallback_to_fdv1: bool) -> Basis: + """Builds a Basis containing a single flag, optionally carrying the FDv1 fallback signal.""" + builder = ChangeSetBuilder() + builder.start(IntentCode.TRANSFER_FULL) + builder.add_put( + ObjectKind.FLAG, + flag_key, + 1, + {"key": flag_key, "version": 1, "on": True, "variations": [True, False]}, + ) + change_set = builder.finish(Selector(state="initializer-state", version=1)) + return Basis( + change_set=change_set, + persist=True, + environment_id=None, + fallback_to_fdv1=fallback_to_fdv1, + ) + + +def test_fdv2_initializer_fallback_with_payload_engages_fdv1_synchronizer(): + """ + When an initializer returns a successful Basis carrying the FDv1 fallback + signal, the SDK must apply the payload, skip configured FDv2 synchronizers, + and run the FDv1 Fallback Synchronizer instead. + """ + init = _StaticInitializer( + "fallback-initializer", + _Success(value=_basis_with_one_flag("init-flag", fallback_to_fdv1=True)), + ) + + # FDv2 streaming synchronizer that should never produce updates because we + # were directed to fall back during initialization. + fdv2_sync_mock: Synchronizer = Mock() + fdv2_sync_mock.name = "fdv2-sync-should-not-run" + fdv2_sync_mock.stop = Mock() + fdv2_sync_mock.sync.return_value = iter([]) + + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=[_InitializerBuilder(init)], + synchronizers=[MockDataSourceBuilder(fdv2_sync_mock)], + fdv1_fallback_synchronizer=td_fdv1.builder, + ) + + fdv1_flag_seen = Event() + init_flag_seen = Event() + + def listener(flag_change: FlagChange): + if flag_change.key == "init-flag": + init_flag_seen.set() + elif flag_change.key == "fdv1-flag": + fdv1_flag_seen.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.flag_change_listeners.add(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + # The initializer's payload must be applied before the handoff. + assert init_flag_seen.wait(1), "Initializer payload was not applied before fallback" + # The configured FDv2 synchronizer must not run after a directive. + fdv2_sync_mock.sync.assert_not_called() + # And the FDv1 Fallback Synchronizer must take over. + assert fdv1_flag_seen.wait(1), "FDv1 fallback synchronizer did not run after directive" + + +def test_fdv2_initializer_fallback_without_fdv1_configured_transitions_to_off(): + """ + When an initializer signals FDv1 fallback but no FDv1 Fallback Synchronizer + is configured, the data source status must transition to OFF rather than + silently dropping the directive or stalling at INITIALIZING. + """ + init = _StaticInitializer( + "fallback-initializer-no-fdv1", + _Fail( + error="boom", + exception=None, + headers={_LD_FD_FALLBACK_HEADER: 'true'}, + ), + ) + + # An FDv2 synchronizer that would otherwise be tried -- it must not run. + fdv2_sync_mock: Synchronizer = Mock() + fdv2_sync_mock.name = "fdv2-sync-should-not-run" + fdv2_sync_mock.stop = Mock() + fdv2_sync_mock.sync.return_value = iter([]) + + data_system_config = DataSystemConfig( + initializers=[_InitializerBuilder(init)], + synchronizers=[MockDataSourceBuilder(fdv2_sync_mock)], + fdv1_fallback_synchronizer=None, + ) + + off = Event() + + def listener(status: DataSourceStatus): + if status.state == DataSourceState.OFF: + off.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.data_source_status_provider.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + assert off.wait(1), "Data source did not transition to OFF after directive without fallback" + fdv2_sync_mock.sync.assert_not_called() + + +def test_fdv2_synchronizer_fallback_on_success_with_payload(): + """ + When a synchronizer emits a Valid update carrying both a ChangeSet and the + FDv1 fallback signal, the SDK must apply the payload before terminally + handing off to the FDv1 Fallback Synchronizer. + """ + builder = ChangeSetBuilder() + builder.start(IntentCode.TRANSFER_FULL) + builder.add_put( + ObjectKind.FLAG, + "fdv2-payload-flag", + 1, + {"key": "fdv2-payload-flag", "version": 1, "on": True, "variations": [True, False]}, + ) + change_set = builder.finish(Selector(state="state", version=1)) + + mock_primary: Synchronizer = Mock() + mock_primary.name = "mock-primary" + mock_primary.stop = Mock() + mock_primary.sync.return_value = iter([ + Update( + state=DataSourceState.VALID, + change_set=change_set, + fallback_to_fdv1=True, + ) + ]) + + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=None, + synchronizers=[MockDataSourceBuilder(mock_primary)], + fdv1_fallback_synchronizer=td_fdv1.builder, + ) + + fdv1_flag_seen = Event() + payload_flag_seen = Event() + + def listener(flag_change: FlagChange): + if flag_change.key == "fdv2-payload-flag": + payload_flag_seen.set() + elif flag_change.key == "fdv1-flag": + fdv1_flag_seen.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.flag_change_listeners.add(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + # The Valid update's payload must be applied before the handoff. + assert payload_flag_seen.wait(1), "FDv2 payload was not applied before fallback" + assert fdv1_flag_seen.wait(1), "FDv1 fallback synchronizer did not run after directive"