From 0640b7225f76085372dfb36fde1658a0178809f5 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 11:12:33 -0400 Subject: [PATCH 1/3] fix: honor server-directed FDv1 Fallback Directive in initializer phase Previously the X-LD-FD-Fallback response header was honored only in the synchronizer phase, and in the synchronizer phase a payload arriving alongside the directive was discarded because the streaming processor halted before applying it. Per the updated FDv2 Data System spec, the directive must be honored in both initializer and synchronizer phases, must take precedence over the section 1.2 failover algorithm, and must let any accompanying payload be applied before the SDK switches terminally to the FDv1 Fallback Synchronizer. Carry the signal explicitly on the public initializer/synchronizer result types -- Basis.fallback_to_fdv1 and Update.fallback_to_fdv1 -- so callers cannot silently drop it. Update.revert_to_fdv1 is renamed to Update.fallback_to_fdv1 to match the spec terminology. --- ldclient/impl/datasourcev2/polling.py | 23 +- ldclient/impl/datasourcev2/streaming.py | 39 ++-- ldclient/impl/datasystem/fdv2.py | 84 ++++++- ldclient/interfaces.py | 16 +- .../datasourcev2/test_polling_initializer.py | 59 ++++- .../datasourcev2/test_polling_synchronizer.py | 46 +++- .../test_streaming_synchronizer.py | 57 +++-- .../impl/datasystem/test_fdv2_datasystem.py | 216 +++++++++++++++++- 8 files changed, 475 insertions(+), 65 deletions(-) diff --git a/ldclient/impl/datasourcev2/polling.py b/ldclient/impl/datasourcev2/polling.py index ff9edecd..326a3e2b 100644 --- a/ldclient/impl/datasourcev2/polling.py +++ b/ldclient/impl/datasourcev2/polling.py @@ -139,7 +139,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: yield Update( state=DataSourceState.OFF, error=error_info, - revert_to_fdv1=True, + fallback_to_fdv1=True, environment_id=envid, ) break @@ -168,6 +168,17 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: message=result.error, ) + # Even a non-HTTP error (e.g. malformed JSON) can carry the fallback + # header. If so, halt rather than retrying the FDv2 endpoint. + if fallback: + yield Update( + state=DataSourceState.OFF, + error=error_info, + fallback_to_fdv1=True, + environment_id=envid, + ) + break + yield Update( state=DataSourceState.INTERRUPTED, error=error_info, @@ -179,7 +190,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: state=DataSourceState.VALID, change_set=change_set, environment_id=headers.get(_LD_ENVID_HEADER), - revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true' + fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true' ) if self._interrupt_event.wait(self._poll_interval): @@ -204,13 +215,18 @@ def _poll(self, ss: SelectorStore) -> BasisResult: if is_http_error_recoverable(status_code): log.warning(http_error_message_result) + # Forward any response headers so callers (e.g. FDv2 datasystem) + # can read the X-LD-FD-Fallback directive even on error. return _Fail( - error=http_error_message_result, exception=result.exception + error=http_error_message_result, + exception=result.exception, + headers=result.headers, ) return _Fail( error=result.error or "Failed to request payload", exception=result.exception, + headers=result.headers, ) (change_set, headers) = result.value @@ -223,6 +239,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult: change_set=change_set, persist=change_set.selector.is_defined(), environment_id=env_id, + fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true', ) return _Success(value=basis) diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 065f7917..47e89927 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -166,6 +166,11 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: self._connection_attempt_start_time = time() envid = None + # fallback_requested is set when a Start action carries + # X-LD-FD-Fallback: true. We finish applying the current payload + # before halting, so consumers can serve the server-provided data + # while FDv1 takes over. + fallback_requested = False for action in self._sse.all: if isinstance(action, Fault): # If the SSE client detects the stream has closed, then it will @@ -186,17 +191,9 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: continue if isinstance(action, Start) and action.headers is not None: - fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true' envid = action.headers.get(_LD_ENVID_HEADER, envid) - - if fallback: - self._record_stream_init(True) - yield Update( - state=DataSourceState.OFF, - revert_to_fdv1=True, - environment_id=envid, - ) - break + if action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true': + fallback_requested = True if not isinstance(action, Event): continue @@ -206,6 +203,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: if update is not None: self._record_stream_init(False) self._connection_attempt_start_time = None + if fallback_requested: + # Decorate the completed update with the fallback signal, + # then halt — the consumer will switch to FDv1. + update = Update( + state=update.state, + change_set=update.change_set, + error=update.error, + fallback_to_fdv1=True, + environment_id=update.environment_id, + ) + yield update + break yield update except json.decoder.JSONDecodeError as e: log.info( @@ -229,7 +238,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: error=DataSourceErrorInfo( DataSourceErrorKind.UNKNOWN, 0, time(), str(e) ), - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) @@ -353,7 +362,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona error=DataSourceErrorInfo( DataSourceErrorKind.INVALID_DATA, 0, time(), str(error) ), - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) return (update, True) @@ -377,7 +386,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona update = Update( state=DataSourceState.OFF, error=error_info, - revert_to_fdv1=True, + fallback_to_fdv1=True, environment_id=envid, ) self.stop() @@ -394,7 +403,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona else DataSourceState.OFF ), error=error_info, - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) @@ -416,7 +425,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona error=DataSourceErrorInfo( DataSourceErrorKind.UNKNOWN, 0, time(), str(error) ), - revert_to_fdv1=False, + fallback_to_fdv1=False, environment_id=envid, ) # no stacktrace here because, for a typical connection error, it'll diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index 97f207f3..682167fd 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -18,9 +18,10 @@ from ldclient.impl.listeners import Listeners from ldclient.impl.repeating_task import RepeatingTask from ldclient.impl.rwlock import ReadWriteLock -from ldclient.impl.util import _Fail, log +from ldclient.impl.util import _LD_FD_FALLBACK_HEADER, _Fail, log from ldclient.interfaces import ( DataSourceErrorInfo, + DataSourceErrorKind, DataSourceState, DataSourceStatus, DataSourceStatusProvider, @@ -276,7 +277,7 @@ class ConditionDirective(str, Enum): FDV1 = "fdv1" """ - FDV1 suggests that we should immediately revert to the FDv1 fallback synchronizer. + FDV1 suggests that we should immediately fall back to the FDv1 Fallback Synchronizer. """ @@ -403,7 +404,26 @@ def _run_main_loop(self, set_on_ready: Event): ) # Run initializers first - self._run_initializers(set_on_ready) + fallback_requested = self._run_initializers(set_on_ready) + + # If an initializer asked the SDK to fall back to FDv1, halt the + # configured FDv2 chain and switch terminally to the FDv1 Fallback + # Synchronizer (or transition to OFF if none is configured). + if fallback_requested: + if self._fdv1_fallback_synchronizer_builder is not None: + log.warning("Falling back to FDv1 protocol") + self._synchronizers = [self._fdv1_fallback_synchronizer_builder] + else: + log.warning( + "Initializer requested FDv1 fallback but none configured" + ) + self._synchronizers = [] + self._data_source_status_provider.update_status( + DataSourceState.OFF, + self._data_source_status_provider.status.error, + ) + set_on_ready.set() + return # Run synchronizers self._run_synchronizers(set_on_ready) @@ -414,14 +434,22 @@ def _run_main_loop(self, set_on_ready: Event): if not set_on_ready.is_set(): set_on_ready.set() - def _run_initializers(self, set_on_ready: Event): - """Run initializers to get initial data.""" + def _run_initializers(self, set_on_ready: Event) -> bool: + """ + Run initializers to get initial data. + + Returns True when an initializer requested the FDv1 Fallback Directive + (via the X-LD-FD-Fallback response header). When that happens, any + accompanying payload is applied first so evaluations can serve the + server-provided data while the FDv1 synchronizer spins up; the caller + is then responsible for switching to the FDv1 Fallback Synchronizer. + """ if self._data_system_config.initializers is None: - return + return False for initializer_builder in self._data_system_config.initializers: if self._stop_event.is_set(): - return + return False try: initializer = initializer_builder.build(self._config) @@ -431,6 +459,25 @@ def _run_initializers(self, set_on_ready: Event): if isinstance(basis_result, _Fail): log.warning("Initializer %s failed: %s", initializer.name, basis_result.error) + # An error response can still carry the FDv1 fallback directive. + if basis_result.headers is not None and \ + basis_result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true': + log.warning( + "Initializer %s requested fallback to FDv1 protocol", + initializer.name, + ) + # Surface the underlying error on the status so + # programmatic monitors can see why FDv2 shut down. + self._data_source_status_provider.update_status( + DataSourceState.INITIALIZING, + DataSourceErrorInfo( + kind=DataSourceErrorKind.UNKNOWN, + status_code=0, + time=time.time(), + message=basis_result.error, + ), + ) + return True continue basis = basis_result.value @@ -442,9 +489,19 @@ def _run_initializers(self, set_on_ready: Event): # Set ready event if an only if a selector is defined for the changeset if basis.change_set.selector.is_defined(): set_on_ready.set() - return + + if basis.fallback_to_fdv1: + log.warning( + "Initializer %s requested fallback to FDv1 protocol", + initializer.name, + ) + return True + + if basis.change_set.selector.is_defined(): + return False except Exception as e: log.error("Initializer failed with exception: %s", e) + return False def _run_synchronizers(self, set_on_ready: Event): """Run synchronizers to keep data up-to-date.""" @@ -476,12 +533,12 @@ def synchronizer_loop(self: 'FDv2'): if directive == ConditionDirective.FDV1: # Abandon all synchronizers and use only fdv1 fallback - log.info("Reverting to FDv1 fallback synchronizer") + log.warning("Falling back to FDv1 protocol") if self._fdv1_fallback_synchronizer_builder is not None: synchronizers_list = [self._fdv1_fallback_synchronizer_builder] current_index = 0 else: - log.warning("No FDv1 fallback synchronizer available") + log.warning("Synchronizer requested FDv1 fallback but none configured") synchronizers_list = [] self._data_source_status_provider.update_status( DataSourceState.OFF, @@ -608,8 +665,11 @@ def reader(self: 'FDv2'): # Update status self._data_source_status_provider.update_status(update.state, update.error) - # Check if we should revert to FDv1 immediately - if update.revert_to_fdv1: + # Check if we should fall back to FDv1 immediately. fallback_to_fdv1 + # may ride along on a Valid update (payload + directive in the same + # response), in which case the ChangeSet has already been applied + # above before we hand off. + if update.fallback_to_fdv1: return ConditionDirective.FDV1 # Check for OFF state indicating permanent failure diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 62e20527..15d0832c 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -1366,6 +1366,13 @@ class Basis: change_set: ChangeSet persist: bool environment_id: Optional[str] = None + fallback_to_fdv1: bool = False + """ + Indicates that the LaunchDarkly server has directed the SDK to fall + back to the FDv1 Fallback Synchronizer (via the X-LD-FD-Fallback + response header). When True, callers must apply ``change_set`` first + and then terminally switch to the FDv1 Fallback Synchronizer. + """ class ChangeSetBuilder: @@ -1647,7 +1654,14 @@ class Update: state: DataSourceState change_set: Optional[ChangeSet] = None error: Optional[DataSourceErrorInfo] = None - revert_to_fdv1: bool = False + fallback_to_fdv1: bool = False + """ + Indicates that the LaunchDarkly server has directed the SDK to fall + back to the FDv1 Fallback Synchronizer (via the X-LD-FD-Fallback + response header). When True, callers must apply any accompanying + ``change_set`` first and then terminally switch to the FDv1 Fallback + Synchronizer. The flag may ride along on Valid or Off updates. + """ environment_id: Optional[str] = None diff --git a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py index 831844d7..d59cc7df 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_initializer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_initializer.py @@ -10,7 +10,12 @@ fdv1_polling_payload_to_changeset, polling_payload_to_changeset ) -from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success +from ldclient.impl.util import ( + _LD_FD_FALLBACK_HEADER, + UnsuccessfulResponseException, + _Fail, + _Success +) from ldclient.interfaces import ChangeSetBuilder, IntentCode from ldclient.testing.mock_components import MockSelectorStore @@ -141,6 +146,58 @@ def test_handles_transfer_changes(): assert result.value.persist is True +def test_initializer_carries_fallback_signal_on_successful_response(): + """A 200 OK with X-LD-FD-Fallback: true must produce a Basis with + fallback_to_fdv1=True so the caller can apply the payload first and then + switch terminally to the FDv1 Fallback Synchronizer.""" + change_set = ChangeSetBuilder.no_changes() + headers = {_LD_FD_FALLBACK_HEADER: 'true'} + mock_requester = MockPollingRequester(_Success(value=(change_set, headers))) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch(MockSelectorStore(Selector.no_selector())) + + assert isinstance(result, _Success) + assert result.value is not None + assert result.value.fallback_to_fdv1 is True + + +def test_initializer_propagates_fallback_signal_on_error_response(): + """An error response carrying X-LD-FD-Fallback: true must propagate the + response headers in the _Fail so the FDv2 datasystem can detect the + directive even when no payload was delivered.""" + headers = {_LD_FD_FALLBACK_HEADER: 'true'} + mock_requester = MockPollingRequester( + _Fail( + error="failure message", + exception=UnsuccessfulResponseException(500), + headers=headers, + ) + ) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch(MockSelectorStore(Selector.no_selector())) + + assert isinstance(result, _Fail) + assert result.headers is not None + assert result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true' + + +def test_initializer_basis_default_fallback_is_false(): + """A response without the fallback header must produce a Basis with + fallback_to_fdv1=False.""" + change_set = ChangeSetBuilder.no_changes() + headers: dict = {} + mock_requester = MockPollingRequester(_Success(value=(change_set, headers))) + ds = PollingDataSource(poll_interval=1.0, requester=mock_requester) + + result = ds.fetch(MockSelectorStore(Selector.no_selector())) + + assert isinstance(result, _Success) + assert result.value is not None + assert result.value.fallback_to_fdv1 is False + + def test_handles_fdv1_payload(): """Test that FDv1 payloads have persist set to False.""" fdv1_data = { diff --git a/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py index 7bc1d79d..8a7d751a 100644 --- a/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_polling_synchronizer.py @@ -116,7 +116,7 @@ def test_handles_no_changes(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None assert valid.change_set.intent_code == IntentCode.TRANSFER_NONE @@ -137,7 +137,7 @@ def test_handles_empty_changeset(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -165,7 +165,7 @@ def test_handles_put_objects(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -196,7 +196,7 @@ def test_handles_delete_objects(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -234,7 +234,7 @@ def test_generic_error_interrupts_and_recovers(): assert interrupted.error.kind == DataSourceErrorKind.NETWORK_ERROR assert interrupted.error.status_code == 0 assert interrupted.error.message == "error for test" - assert interrupted.revert_to_fdv1 is False + assert interrupted.fallback_to_fdv1 is False assert interrupted.environment_id is None assert valid.change_set is not None @@ -267,12 +267,12 @@ def test_recoverable_error_continues(): assert interrupted.error is not None assert interrupted.error.kind == DataSourceErrorKind.ERROR_RESPONSE assert interrupted.error.status_code == 408 - assert interrupted.revert_to_fdv1 is False + assert interrupted.fallback_to_fdv1 is False assert interrupted.environment_id is None assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id is None assert valid.change_set is not None @@ -303,7 +303,7 @@ def test_unrecoverable_error_shuts_down(): assert off.error is not None assert off.error.kind == DataSourceErrorKind.ERROR_RESPONSE assert off.error.status_code == 401 - assert off.revert_to_fdv1 is False + assert off.fallback_to_fdv1 is False assert off.environment_id is None assert off.change_set is None @@ -328,7 +328,7 @@ def test_envid_from_success_headers(): assert valid.state == DataSourceState.VALID assert valid.error is None - assert valid.revert_to_fdv1 is False + assert valid.fallback_to_fdv1 is False assert valid.environment_id == 'test-env-polling-123' @@ -370,7 +370,7 @@ def test_envid_from_fallback_headers(): valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) assert valid.state == DataSourceState.VALID - assert valid.revert_to_fdv1 is True + assert valid.fallback_to_fdv1 is True assert valid.environment_id == 'test-env-fallback' @@ -449,7 +449,7 @@ def test_envid_from_error_with_fallback(): off = next(sync) assert off.state == DataSourceState.OFF - assert off.revert_to_fdv1 is True + assert off.fallback_to_fdv1 is True assert off.environment_id == 'test-env-503' @@ -478,3 +478,27 @@ def test_envid_from_generic_error_with_headers(): assert interrupted.error.kind == DataSourceErrorKind.NETWORK_ERROR assert valid.state == DataSourceState.VALID + + +def test_synchronizer_generic_error_with_fallback_header_halts(): + """A non-HTTP error (e.g. malformed JSON) carrying X-LD-FD-Fallback: true + must halt the synchronizer and emit OFF + fallback_to_fdv1=True rather + than retrying the FDv2 endpoint.""" + headers_error = { + _LD_ENVID_HEADER: 'test-env-generic', + _LD_FD_FALLBACK_HEADER: 'true', + } + _failure = _Fail(error="malformed body", headers=headers_error) + + synchronizer = PollingDataSource( + poll_interval=0.01, + requester=ListBasedRequester(results=iter([_failure])), + ) + updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + + assert len(updates) == 1 + assert updates[0].state == DataSourceState.OFF + assert updates[0].fallback_to_fdv1 is True + assert updates[0].environment_id == 'test-env-generic' + assert updates[0].error is not None + assert updates[0].error.kind == DataSourceErrorKind.NETWORK_ERROR diff --git a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py index 559e2c24..a27fa237 100644 --- a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py @@ -202,7 +202,7 @@ def test_handles_no_changes(): assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is None @@ -222,7 +222,7 @@ def test_handles_empty_changeset(events): # pylint: disable=redefined-outer-nam assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -249,7 +249,7 @@ def test_handles_put_objects(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -281,7 +281,7 @@ def test_handles_delete_objects(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -312,7 +312,7 @@ def test_swallows_goodbye(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -339,7 +339,7 @@ def test_swallows_heartbeat(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -368,7 +368,7 @@ def test_error_resets(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.VALID assert updates[0].error is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].change_set is not None @@ -392,7 +392,7 @@ def test_handles_out_of_order(events): # pylint: disable=redefined-outer-name assert len(updates) == 1 assert updates[0].state == DataSourceState.INTERRUPTED assert updates[0].change_set is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].error is not None @@ -423,7 +423,7 @@ def test_invalid_json_decoding(events): # pylint: disable=redefined-outer-name assert len(updates) == 2 assert updates[0].state == DataSourceState.INTERRUPTED assert updates[0].change_set is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].error is not None @@ -458,7 +458,7 @@ def test_stops_on_unrecoverable_status_code( assert len(updates) == 1 assert updates[0].state == DataSourceState.OFF assert updates[0].change_set is None - assert updates[0].revert_to_fdv1 is False + assert updates[0].fallback_to_fdv1 is False assert updates[0].environment_id is None assert updates[0].error is not None @@ -583,8 +583,10 @@ def test_envid_preserved_across_events(events): # pylint: disable=redefined-out assert len(updates[0].change_set.changes) == 1 -def test_envid_from_fallback_header(): - """Test that environment ID is captured when fallback header is present""" +def test_fallback_header_with_no_payload_emits_no_update(): + """A Start carrying X-LD-FD-Fallback with no following payload events + must not synthesize an Update. The directive only fires once a payload + has been applied or an error has been observed.""" start_action = Start(headers={_LD_ENVID_HEADER: 'test-env-fallback', _LD_FD_FALLBACK_HEADER: 'true'}) builder = list_sse_client([start_action]) @@ -593,10 +595,35 @@ def test_envid_from_fallback_header(): synchronizer._sse_client_builder = builder updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + assert updates == [] + + +def test_fallback_header_with_payload_emits_valid_with_fallback(events): # pylint: disable=redefined-outer-name + """When the response carries X-LD-FD-Fallback: true and a valid SSE + payload, the synchronizer must apply the payload and then emit a single + Valid update with fallback_to_fdv1=True so the consumer can hand off to + the FDv1 Fallback Synchronizer.""" + start_action = Start(headers={_LD_ENVID_HEADER: 'test-env-fallback', _LD_FD_FALLBACK_HEADER: 'true'}) + + builder = list_sse_client( + [ + start_action, + events[EventName.SERVER_INTENT], + events[EventName.PUT_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ] + ) + + synchronizer = make_streaming_data_source() + synchronizer._sse_client_builder = builder + updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + assert len(updates) == 1 - assert updates[0].state == DataSourceState.OFF - assert updates[0].revert_to_fdv1 is True + assert updates[0].state == DataSourceState.VALID + assert updates[0].fallback_to_fdv1 is True assert updates[0].environment_id == 'test-env-fallback' + assert updates[0].change_set is not None + assert len(updates[0].change_set.changes) == 1 def test_envid_from_fault_action(): @@ -655,7 +682,7 @@ def test_envid_from_fault_with_fallback(): assert len(updates) == 1 assert updates[0].state == DataSourceState.OFF - assert updates[0].revert_to_fdv1 is True + assert updates[0].fallback_to_fdv1 is True assert updates[0].environment_id == 'test-env-503' diff --git a/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py index d49a82aa..8e885e3a 100644 --- a/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py +++ b/ldclient/testing/impl/datasystem/test_fdv2_datasystem.py @@ -11,11 +11,20 @@ from ldclient.datasystem import file_ds_builder from ldclient.impl.datasystem import DataAvailability from ldclient.impl.datasystem.fdv2 import FDv2 +from ldclient.impl.util import _LD_FD_FALLBACK_HEADER, _Fail, _Success from ldclient.integrations.test_datav2 import TestDataV2 from ldclient.interfaces import ( + Basis, + BasisResult, + ChangeSetBuilder, DataSourceState, DataSourceStatus, FlagChange, + Initializer, + IntentCode, + ObjectKind, + Selector, + SelectorStore, Synchronizer, Update ) @@ -32,6 +41,31 @@ def build(self, config: Config) -> Synchronizer: # pylint: disable=unused-argum return self._mock +class _StaticInitializer(Initializer): + """A test initializer that returns a fixed BasisResult.""" + + def __init__(self, name: str, result: BasisResult): + self._name = name + self._result = result + + @property + def name(self) -> str: + return self._name + + def fetch(self, ss: SelectorStore) -> BasisResult: # pylint: disable=unused-argument + return self._result + + +class _InitializerBuilder(DataSourceBuilder): # pylint: disable=too-few-public-methods + """Wraps a static Initializer as a DataSourceBuilder.""" + + def __init__(self, initializer: Initializer): + self._initializer = initializer + + def build(self, config: Config) -> Initializer: # pylint: disable=unused-argument + return self._initializer + + def test_two_phase_init(): td_initializer = TestDataV2.data_source() td_initializer.update(td_initializer.flag("feature-flag").on(True)) @@ -196,11 +230,11 @@ def test_fdv2_falls_back_to_fdv1_on_polling_error_with_header(): mock_primary.name = "mock-primary" mock_primary.stop = Mock() - # Simulate a synchronizer that yields an OFF state with revert_to_fdv1=True + # Simulate a synchronizer that yields an OFF state with fallback_to_fdv1=True mock_primary.sync.return_value = iter([ Update( state=DataSourceState.OFF, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -250,7 +284,7 @@ def test_fdv2_falls_back_to_fdv1_on_polling_success_with_header(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.VALID, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -304,7 +338,7 @@ def test_fdv2_falls_back_to_fdv1_with_initializer(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.OFF, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -353,7 +387,7 @@ def test_fdv2_no_fallback_without_header(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.INTERRUPTED, - revert_to_fdv1=False # No fallback + fallback_to_fdv1=False # No fallback ) ]) @@ -364,7 +398,7 @@ def test_fdv2_no_fallback_without_header(): mock_secondary.sync.return_value = iter([ Update( state=DataSourceState.VALID, - revert_to_fdv1=False + fallback_to_fdv1=False ) ]) @@ -407,7 +441,7 @@ def test_fdv2_stays_on_fdv1_after_fallback(): mock_primary.sync.return_value = iter([ Update( state=DataSourceState.OFF, - revert_to_fdv1=True + fallback_to_fdv1=True ) ]) @@ -548,3 +582,171 @@ def listener(_: FlagChange): fdv2.stop() finally: os.remove(path) + + +def _basis_with_one_flag(flag_key: str, fallback_to_fdv1: bool) -> Basis: + """Builds a Basis containing a single flag, optionally carrying the FDv1 fallback signal.""" + builder = ChangeSetBuilder() + builder.start(IntentCode.TRANSFER_FULL) + builder.add_put( + ObjectKind.FLAG, + flag_key, + 1, + {"key": flag_key, "version": 1, "on": True, "variations": [True, False]}, + ) + change_set = builder.finish(Selector(state="initializer-state", version=1)) + return Basis( + change_set=change_set, + persist=True, + environment_id=None, + fallback_to_fdv1=fallback_to_fdv1, + ) + + +def test_fdv2_initializer_fallback_with_payload_engages_fdv1_synchronizer(): + """ + When an initializer returns a successful Basis carrying the FDv1 fallback + signal, the SDK must apply the payload, skip configured FDv2 synchronizers, + and run the FDv1 Fallback Synchronizer instead. + """ + init = _StaticInitializer( + "fallback-initializer", + _Success(value=_basis_with_one_flag("init-flag", fallback_to_fdv1=True)), + ) + + # FDv2 streaming synchronizer that should never produce updates because we + # were directed to fall back during initialization. + fdv2_sync_mock: Synchronizer = Mock() + fdv2_sync_mock.name = "fdv2-sync-should-not-run" + fdv2_sync_mock.stop = Mock() + fdv2_sync_mock.sync.return_value = iter([]) + + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=[_InitializerBuilder(init)], + synchronizers=[MockDataSourceBuilder(fdv2_sync_mock)], + fdv1_fallback_synchronizer=td_fdv1.builder, + ) + + fdv1_flag_seen = Event() + init_flag_seen = Event() + + def listener(flag_change: FlagChange): + if flag_change.key == "init-flag": + init_flag_seen.set() + elif flag_change.key == "fdv1-flag": + fdv1_flag_seen.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.flag_change_listeners.add(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + # The initializer's payload must be applied before the handoff. + assert init_flag_seen.wait(1), "Initializer payload was not applied before fallback" + # The configured FDv2 synchronizer must not run after a directive. + fdv2_sync_mock.sync.assert_not_called() + # And the FDv1 Fallback Synchronizer must take over. + assert fdv1_flag_seen.wait(1), "FDv1 fallback synchronizer did not run after directive" + + +def test_fdv2_initializer_fallback_without_fdv1_configured_transitions_to_off(): + """ + When an initializer signals FDv1 fallback but no FDv1 Fallback Synchronizer + is configured, the data source status must transition to OFF rather than + silently dropping the directive or stalling at INITIALIZING. + """ + init = _StaticInitializer( + "fallback-initializer-no-fdv1", + _Fail( + error="boom", + exception=None, + headers={_LD_FD_FALLBACK_HEADER: 'true'}, + ), + ) + + # An FDv2 synchronizer that would otherwise be tried -- it must not run. + fdv2_sync_mock: Synchronizer = Mock() + fdv2_sync_mock.name = "fdv2-sync-should-not-run" + fdv2_sync_mock.stop = Mock() + fdv2_sync_mock.sync.return_value = iter([]) + + data_system_config = DataSystemConfig( + initializers=[_InitializerBuilder(init)], + synchronizers=[MockDataSourceBuilder(fdv2_sync_mock)], + fdv1_fallback_synchronizer=None, + ) + + off = Event() + + def listener(status: DataSourceStatus): + if status.state == DataSourceState.OFF: + off.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.data_source_status_provider.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + assert off.wait(1), "Data source did not transition to OFF after directive without fallback" + fdv2_sync_mock.sync.assert_not_called() + + +def test_fdv2_synchronizer_fallback_on_success_with_payload(): + """ + When a synchronizer emits a Valid update carrying both a ChangeSet and the + FDv1 fallback signal, the SDK must apply the payload before terminally + handing off to the FDv1 Fallback Synchronizer. + """ + builder = ChangeSetBuilder() + builder.start(IntentCode.TRANSFER_FULL) + builder.add_put( + ObjectKind.FLAG, + "fdv2-payload-flag", + 1, + {"key": "fdv2-payload-flag", "version": 1, "on": True, "variations": [True, False]}, + ) + change_set = builder.finish(Selector(state="state", version=1)) + + mock_primary: Synchronizer = Mock() + mock_primary.name = "mock-primary" + mock_primary.stop = Mock() + mock_primary.sync.return_value = iter([ + Update( + state=DataSourceState.VALID, + change_set=change_set, + fallback_to_fdv1=True, + ) + ]) + + td_fdv1 = TestDataV2.data_source() + td_fdv1.update(td_fdv1.flag("fdv1-flag").on(True)) + + data_system_config = DataSystemConfig( + initializers=None, + synchronizers=[MockDataSourceBuilder(mock_primary)], + fdv1_fallback_synchronizer=td_fdv1.builder, + ) + + fdv1_flag_seen = Event() + payload_flag_seen = Event() + + def listener(flag_change: FlagChange): + if flag_change.key == "fdv2-payload-flag": + payload_flag_seen.set() + elif flag_change.key == "fdv1-flag": + fdv1_flag_seen.set() + + set_on_ready = Event() + fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config) + fdv2.flag_change_listeners.add(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + # The Valid update's payload must be applied before the handoff. + assert payload_flag_seen.wait(1), "FDv2 payload was not applied before fallback" + assert fdv1_flag_seen.wait(1), "FDv1 fallback synchronizer did not run after directive" From 5d0755de8923ab5bbb8a822a2fa16333029b4c52 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 11:12:42 -0400 Subject: [PATCH 2/3] test: wire FDv1 Fallback Directive into contract test service Declare the fdv1-fallback capability and accept the new top-level dataSystem.fdv1Fallback config object, wiring it directly to the SDK's FDv1 Fallback Synchronizer. Drop the heuristic that previously inferred the FDv1 fallback from the last polling synchronizer entry -- the FDv1 Fallback Synchronizer is a distinct configuration concern from the FDv2 Primary/Fallback chain. Also bump the test harness pin to v3.0.0-alpha.6 so the new fdv1-fallback test suite runs in CI. --- .github/workflows/ci.yml | 2 +- contract-tests/client_entity.py | 17 ++++++++++------- contract-tests/service.py | 1 + 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8261bd95..8eaad8b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: with: test_service_port: 9000 token: ${{ secrets.GITHUB_TOKEN }} - version: v3.0.0-alpha.4 + version: v3.0.0-alpha.6 enable_persistence_tests: "true" windows: diff --git a/contract-tests/client_entity.py b/contract-tests/client_entity.py index ae1b9eea..e719d845 100644 --- a/contract-tests/client_entity.py +++ b/contract-tests/client_entity.py @@ -62,7 +62,6 @@ def __init__(self, tag, config): sync_configs = datasystem_config.get('synchronizers') if sync_configs is not None: sync_builders = [] - fallback_builder = None for sync_config in sync_configs: streaming = sync_config.get('streaming') @@ -79,14 +78,18 @@ def __init__(self, tag, config): _set_optional_time(polling, "pollIntervalMs", builder.poll_interval) sync_builders.append(builder) - fallback_builder = fdv1_fallback_ds_builder() - _set_optional_value(polling, "baseUri", fallback_builder.base_uri) - _set_optional_time(polling, "pollIntervalMs", fallback_builder.poll_interval) - if sync_builders: datasystem.synchronizers(*sync_builders) - if fallback_builder is not None: - datasystem.fdv1_compatible_synchronizer(fallback_builder) + + # The FDv1 Fallback Synchronizer is engaged only in response to a + # server-directed FDv1 Fallback Directive; it is configured separately + # from the FDv2 Primary/Fallback synchronizer chain. + fdv1_fallback_config = datasystem_config.get('fdv1Fallback') + if fdv1_fallback_config is not None: + fallback_builder = fdv1_fallback_ds_builder() + _set_optional_value(fdv1_fallback_config, "baseUri", fallback_builder.base_uri) + _set_optional_time(fdv1_fallback_config, "pollIntervalMs", fallback_builder.poll_interval) + datasystem.fdv1_compatible_synchronizer(fallback_builder) if datasystem_config.get("payloadFilter") is not None: opts["payload_filter_key"] = datasystem_config["payloadFilter"] diff --git a/contract-tests/service.py b/contract-tests/service.py index 7b023bcf..342c3870 100644 --- a/contract-tests/service.py +++ b/contract-tests/service.py @@ -84,6 +84,7 @@ def status(): 'persistent-data-store-consul', 'flag-change-listeners', 'flag-value-change-listeners', + 'fdv1-fallback', ] } return json.dumps(body), 200, {'Content-type': 'application/json'} From f37695bcb7ee67810c5f4c30418a39504e4d4bb5 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 30 Apr 2026 12:15:11 -0400 Subject: [PATCH 3/3] fix: tear down urllib3 pool when stopping FDv2 streaming sync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the FDv1 Fallback Directive (Data System spec 1.6.3), the Primary Synchronizer must be terminated when the directive engages. The streaming source called ``SSEClient.close()`` to do this, but on Python 3.10 that wasn't enough: ``SSEClient.close()`` ultimately invokes ``urllib3.HTTPResponse.shutdown()`` (which only performs a half-close on the local read side) plus ``release_conn()`` (which returns the live connection to the pool). The TCP socket stayed open until the response was garbage-collected, which the contract test harness flagged on the linux/3.10 matrix entry as "SDK did not close the FDv2 streaming connection after the directive — the Primary Synchronizer must be stopped when Directed Fallback engages." Newer Python versions GC the response promptly enough to hide the leak. Track the urllib3 PoolManager alongside the SSEClient and call ``HTTPConnectionPool.close()`` on each pool when the synchronizer stops. ``HTTPConnectionPool.close()`` drains the connection queue and closes each socket, sending the FIN the server is waiting on. ``PoolManager.clear()`` alone won't do this -- it drops the dict of pools without closing the connections inside them. The fix is deterministic (no sleeps) and runs both at the natural end of ``sync()`` and in ``stop()`` so it's robust to whichever thread arrives first. --- ldclient/impl/datasourcev2/streaming.py | 67 +++++++++++++++++-- .../test_streaming_synchronizer.py | 59 ++++++++++++++++ 2 files changed, 120 insertions(+), 6 deletions(-) diff --git a/ldclient/impl/datasourcev2/streaming.py b/ldclient/impl/datasourcev2/streaming.py index 47e89927..8180c901 100644 --- a/ldclient/impl/datasourcev2/streaming.py +++ b/ldclient/impl/datasourcev2/streaming.py @@ -8,6 +8,7 @@ from typing import Callable, Generator, Optional, Tuple from urllib import parse +import urllib3 from ld_eventsource import SSEClient from ld_eventsource.actions import Event, Fault, Start from ld_eventsource.config import ( @@ -57,7 +58,10 @@ STREAMING_ENDPOINT = "/sdk/stream" -SseClientBuilder = Callable[[str, HTTPConfig, float, Config, SelectorStore], SSEClient] +SseClientBuilder = Callable[ + [str, HTTPConfig, float, Config, SelectorStore], + Tuple[SSEClient, Optional[urllib3.PoolManager]], +] def create_sse_client( @@ -66,10 +70,17 @@ def create_sse_client( initial_reconnect_delay: float, config: Config, ss: SelectorStore -) -> SSEClient: +) -> Tuple[SSEClient, Optional[urllib3.PoolManager]]: """ " create_sse_client creates an SSEClient instance configured to connect - to the LaunchDarkly streaming endpoint. + to the LaunchDarkly streaming endpoint, along with the urllib3 PoolManager + backing it. The pool is returned alongside the client so the caller can + force-close any pooled connections on shutdown -- ``SSEClient.close()`` + only releases the connection back to the pool via ``urllib3.HTTPResponse + .shutdown()`` (which performs a half-close on the local read side) plus + ``release_conn()``, neither of which actually closes the underlying TCP + socket on Python 3.10. Closing the pool ensures the server observes the + client's disconnect when the FDv1 Fallback Directive engages. """ uri = base_uri + STREAMING_ENDPOINT if config.payload_filter_key is not None: @@ -87,11 +98,12 @@ def query_params() -> dict[str, str]: selector = ss.selector() return {"basis": selector.state} if selector.is_defined() else {} - return SSEClient( + pool = stream_http_factory.create_pool_manager(1, uri) + sse_client = SSEClient( connect=ConnectStrategy.http( url=uri, headers=base_headers, - pool=stream_http_factory.create_pool_manager(1, uri), + pool=pool, urllib3_request_options={"timeout": stream_http_factory.timeout}, query_params=query_params ), @@ -106,6 +118,31 @@ def query_params() -> dict[str, str]: retry_delay_reset_threshold=BACKOFF_RESET_INTERVAL, logger=log, ) + return sse_client, pool + + +def _close_pool_manager(pool: Optional[urllib3.PoolManager]) -> None: + """Close every pooled connection in ``pool`` so the underlying TCP sockets + are torn down. ``HTTPConnectionPool.close()`` drains its queue and calls + ``conn.close()`` on each connection, which sends the FIN that the server + is waiting on. ``PoolManager.clear()`` alone doesn't do this -- it just + drops the dict of pools without closing the connections inside them.""" + if pool is None: + return + try: + # ``RecentlyUsedContainer`` deliberately disallows iteration; ``keys()`` + # returns a thread-safe snapshot. We look each one up to close its + # underlying ``HTTPConnectionPool``. + for key in list(pool.pools.keys()): + try: + connection_pool = pool.pools.get(key) + if connection_pool is not None: + connection_pool.close() + except Exception: # pylint: disable=broad-except + log.debug("Error closing streaming connection pool", exc_info=True) + pool.clear() + except Exception: # pylint: disable=broad-except + log.debug("Error closing streaming pool manager", exc_info=True) class StreamingDataSource(Synchronizer, DiagnosticSource): @@ -129,6 +166,7 @@ def __init__(self, self._sse_client_builder = create_sse_client self._config = config self._sse: Optional[SSEClient] = None + self._sse_pool: Optional[urllib3.PoolManager] = None self._running = False self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None self._connection_attempt_start_time: Optional[float] = None @@ -149,13 +187,19 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: Update objects until the connection is closed or an unrecoverable error occurs. """ - self._sse = self._sse_client_builder( + builder_result = self._sse_client_builder( self.__uri, self.__http_options, self.__initial_reconnect_delay, self._config, ss ) + # Tests may inject a builder that returns either an SSEClient directly + # or a (client, pool) tuple. Accept both. + if isinstance(builder_result, tuple): + self._sse, self._sse_pool = builder_result + else: + self._sse, self._sse_pool = builder_result, None if self._sse is None: log.error("Failed to create SSE client for streaming updates.") @@ -243,6 +287,13 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: ) self._sse.close() + # Force-close the underlying urllib3 pool. SSEClient.close() only does a + # half-close on the local read side and releases the connection back to + # the pool, which on Python 3.10 leaves the TCP socket open until the + # response object is garbage-collected. The FDv1 Fallback Directive + # requires the Primary Synchronizer to be terminated promptly, so we + # tear down the pool here to send the FIN the server is waiting on. + _close_pool_manager(self._sse_pool) def stop(self): """ @@ -252,6 +303,10 @@ def stop(self): self._running = False if self._sse: self._sse.close() + # See _close_pool_manager docstring: this is what actually severs the + # TCP connection. ``stop()`` may be called from a different thread than + # the one running ``sync()``; close() is idempotent on the pool. + _close_pool_manager(self._sse_pool) def _record_stream_init(self, failed: bool): if self._diagnostic_accumulator and self._connection_attempt_start_time: diff --git a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py index a27fa237..62fb0964 100644 --- a/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py +++ b/ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py @@ -626,6 +626,65 @@ def test_fallback_header_with_payload_emits_valid_with_fallback(events): # pyli assert len(updates[0].change_set.changes) == 1 +def test_streaming_closes_underlying_pool_on_fallback(events): # pylint: disable=redefined-outer-name + """When the FDv1 Fallback Directive engages, the underlying urllib3 + connection pool must be torn down so the FDv2 streaming TCP connection + is actually closed. ``SSEClient.close()`` only releases the connection + back to the pool via a half-close; on Python 3.10 that leaves the socket + open until GC, which the spec forbids -- the Primary Synchronizer must + be terminated promptly when the directive fires.""" + pool_close_calls = [] + + class TrackingPool: + """Stand-in PoolManager that records calls to clear() and exposes a + keys()-iterable pools attribute matching urllib3's RecentlyUsedContainer.""" + + def __init__(self): + self.cleared = False + self.connection_pool = TrackingConnectionPool() + self.pools = TrackingPoolDict({"key": self.connection_pool}) + + def clear(self): + self.cleared = True + + class TrackingConnectionPool: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + pool_close_calls.append(self) + + class TrackingPoolDict: + def __init__(self, items): + self._items = items + + def keys(self): + return list(self._items.keys()) + + def get(self, key): + return self._items.get(key) + + tracking_pool = TrackingPool() + + def builder(*_args, **_kwargs): + return ListBasedSseClient([ + Start(headers={_LD_FD_FALLBACK_HEADER: 'true'}), + events[EventName.SERVER_INTENT], + events[EventName.PUT_OBJECT], + events[EventName.PAYLOAD_TRANSFERRED], + ]), tracking_pool + + synchronizer = make_streaming_data_source() + synchronizer._sse_client_builder = builder + updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector()))) + + assert len(updates) == 1 + assert updates[0].fallback_to_fdv1 is True + assert tracking_pool.cleared is True + assert tracking_pool.connection_pool.closed is True + + def test_envid_from_fault_action(): """Test that environment ID is captured from Fault action headers""" error = HTTPStatusError(401, headers={_LD_ENVID_HEADER: 'test-env-fault'})