Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
with:
test_service_port: 9000
token: ${{ secrets.GITHUB_TOKEN }}
version: v3.0.0-alpha.4
version: v3.0.0-alpha.6
enable_persistence_tests: "true"

windows:
Expand Down
17 changes: 10 additions & 7 deletions contract-tests/client_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def __init__(self, tag, config):
sync_configs = datasystem_config.get('synchronizers')
if sync_configs is not None:
sync_builders = []
fallback_builder = None

for sync_config in sync_configs:
streaming = sync_config.get('streaming')
Expand All @@ -79,14 +78,18 @@ def __init__(self, tag, config):
_set_optional_time(polling, "pollIntervalMs", builder.poll_interval)
sync_builders.append(builder)

fallback_builder = fdv1_fallback_ds_builder()
_set_optional_value(polling, "baseUri", fallback_builder.base_uri)
_set_optional_time(polling, "pollIntervalMs", fallback_builder.poll_interval)

if sync_builders:
datasystem.synchronizers(*sync_builders)
if fallback_builder is not None:
datasystem.fdv1_compatible_synchronizer(fallback_builder)

# The FDv1 Fallback Synchronizer is engaged only in response to a
# server-directed FDv1 Fallback Directive; it is configured separately
# from the FDv2 Primary/Fallback synchronizer chain.
fdv1_fallback_config = datasystem_config.get('fdv1Fallback')
if fdv1_fallback_config is not None:
fallback_builder = fdv1_fallback_ds_builder()
_set_optional_value(fdv1_fallback_config, "baseUri", fallback_builder.base_uri)
_set_optional_time(fdv1_fallback_config, "pollIntervalMs", fallback_builder.poll_interval)
datasystem.fdv1_compatible_synchronizer(fallback_builder)

if datasystem_config.get("payloadFilter") is not None:
opts["payload_filter_key"] = datasystem_config["payloadFilter"]
Expand Down
1 change: 1 addition & 0 deletions contract-tests/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def status():
'persistent-data-store-consul',
'flag-change-listeners',
'flag-value-change-listeners',
'fdv1-fallback',
]
}
return json.dumps(body), 200, {'Content-type': 'application/json'}
Expand Down
23 changes: 20 additions & 3 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
yield Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True,
fallback_to_fdv1=True,
environment_id=envid,
)
break
Expand Down Expand Up @@ -168,6 +168,17 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
message=result.error,
)

# Even a non-HTTP error (e.g. malformed JSON) can carry the fallback
# header. If so, halt rather than retrying the FDv2 endpoint.
if fallback:
yield Update(
state=DataSourceState.OFF,
error=error_info,
fallback_to_fdv1=True,
environment_id=envid,
)
break

yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
Expand All @@ -179,7 +190,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
state=DataSourceState.VALID,
change_set=change_set,
environment_id=headers.get(_LD_ENVID_HEADER),
revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
)

if self._interrupt_event.wait(self._poll_interval):
Expand All @@ -204,13 +215,18 @@ def _poll(self, ss: SelectorStore) -> BasisResult:
if is_http_error_recoverable(status_code):
log.warning(http_error_message_result)

# Forward any response headers so callers (e.g. FDv2 datasystem)
# can read the X-LD-FD-Fallback directive even on error.
return _Fail(
error=http_error_message_result, exception=result.exception
error=http_error_message_result,
exception=result.exception,
headers=result.headers,
)

return _Fail(
error=result.error or "Failed to request payload",
exception=result.exception,
headers=result.headers,
)

(change_set, headers) = result.value
Expand All @@ -223,6 +239,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult:
change_set=change_set,
persist=change_set.selector.is_defined(),
environment_id=env_id,
fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true',
)

return _Success(value=basis)
Expand Down
106 changes: 85 additions & 21 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Callable, Generator, Optional, Tuple
from urllib import parse

import urllib3
from ld_eventsource import SSEClient
from ld_eventsource.actions import Event, Fault, Start
from ld_eventsource.config import (
Expand Down Expand Up @@ -57,7 +58,10 @@

STREAMING_ENDPOINT = "/sdk/stream"

SseClientBuilder = Callable[[str, HTTPConfig, float, Config, SelectorStore], SSEClient]
SseClientBuilder = Callable[
[str, HTTPConfig, float, Config, SelectorStore],
Tuple[SSEClient, Optional[urllib3.PoolManager]],
]


def create_sse_client(
Expand All @@ -66,10 +70,17 @@ def create_sse_client(
initial_reconnect_delay: float,
config: Config,
ss: SelectorStore
) -> SSEClient:
) -> Tuple[SSEClient, Optional[urllib3.PoolManager]]:
""" "
create_sse_client creates an SSEClient instance configured to connect
to the LaunchDarkly streaming endpoint.
to the LaunchDarkly streaming endpoint, along with the urllib3 PoolManager
backing it. The pool is returned alongside the client so the caller can
force-close any pooled connections on shutdown -- ``SSEClient.close()``
only releases the connection back to the pool via ``urllib3.HTTPResponse
.shutdown()`` (which performs a half-close on the local read side) plus
``release_conn()``, neither of which actually closes the underlying TCP
socket on Python 3.10. Closing the pool ensures the server observes the
client's disconnect when the FDv1 Fallback Directive engages.
"""
uri = base_uri + STREAMING_ENDPOINT
if config.payload_filter_key is not None:
Expand All @@ -87,11 +98,12 @@ def query_params() -> dict[str, str]:
selector = ss.selector()
return {"basis": selector.state} if selector.is_defined() else {}

return SSEClient(
pool = stream_http_factory.create_pool_manager(1, uri)
sse_client = SSEClient(
connect=ConnectStrategy.http(
url=uri,
headers=base_headers,
pool=stream_http_factory.create_pool_manager(1, uri),
pool=pool,
urllib3_request_options={"timeout": stream_http_factory.timeout},
query_params=query_params
),
Expand All @@ -106,6 +118,31 @@ def query_params() -> dict[str, str]:
retry_delay_reset_threshold=BACKOFF_RESET_INTERVAL,
logger=log,
)
return sse_client, pool


def _close_pool_manager(pool: Optional[urllib3.PoolManager]) -> None:
"""Close every pooled connection in ``pool`` so the underlying TCP sockets
are torn down. ``HTTPConnectionPool.close()`` drains its queue and calls
``conn.close()`` on each connection, which sends the FIN that the server
is waiting on. ``PoolManager.clear()`` alone doesn't do this -- it just
drops the dict of pools without closing the connections inside them."""
if pool is None:
return
try:
# ``RecentlyUsedContainer`` deliberately disallows iteration; ``keys()``
# returns a thread-safe snapshot. We look each one up to close its
# underlying ``HTTPConnectionPool``.
for key in list(pool.pools.keys()):
try:
connection_pool = pool.pools.get(key)
if connection_pool is not None:
connection_pool.close()
except Exception: # pylint: disable=broad-except
log.debug("Error closing streaming connection pool", exc_info=True)
pool.clear()
except Exception: # pylint: disable=broad-except
log.debug("Error closing streaming pool manager", exc_info=True)


class StreamingDataSource(Synchronizer, DiagnosticSource):
Expand All @@ -129,6 +166,7 @@ def __init__(self,
self._sse_client_builder = create_sse_client
self._config = config
self._sse: Optional[SSEClient] = None
self._sse_pool: Optional[urllib3.PoolManager] = None
self._running = False
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
self._connection_attempt_start_time: Optional[float] = None
Expand All @@ -149,13 +187,19 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
self._sse = self._sse_client_builder(
builder_result = self._sse_client_builder(
self.__uri,
self.__http_options,
self.__initial_reconnect_delay,
self._config,
ss
)
# Tests may inject a builder that returns either an SSEClient directly
# or a (client, pool) tuple. Accept both.
if isinstance(builder_result, tuple):
self._sse, self._sse_pool = builder_result
else:
self._sse, self._sse_pool = builder_result, None

if self._sse is None:
log.error("Failed to create SSE client for streaming updates.")
Expand All @@ -166,6 +210,11 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
self._connection_attempt_start_time = time()

envid = None
# fallback_requested is set when a Start action carries
# X-LD-FD-Fallback: true. We finish applying the current payload
# before halting, so consumers can serve the server-provided data
# while FDv1 takes over.
fallback_requested = False
for action in self._sse.all:
if isinstance(action, Fault):
# If the SSE client detects the stream has closed, then it will
Expand All @@ -186,17 +235,9 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
continue

if isinstance(action, Start) and action.headers is not None:
fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
envid = action.headers.get(_LD_ENVID_HEADER, envid)

if fallback:
self._record_stream_init(True)
yield Update(
state=DataSourceState.OFF,
revert_to_fdv1=True,
environment_id=envid,
)
break
if action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
fallback_requested = True

if not isinstance(action, Event):
continue
Expand All @@ -206,6 +247,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
if update is not None:
self._record_stream_init(False)
self._connection_attempt_start_time = None
if fallback_requested:
# Decorate the completed update with the fallback signal,
# then halt — the consumer will switch to FDv1.
update = Update(
state=update.state,
change_set=update.change_set,
error=update.error,
fallback_to_fdv1=True,
environment_id=update.environment_id,
)
yield update
break
yield update
except json.decoder.JSONDecodeError as e:
log.info(
Expand All @@ -229,11 +282,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
error=DataSourceErrorInfo(
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
),
revert_to_fdv1=False,
fallback_to_fdv1=False,
environment_id=envid,
)

self._sse.close()
# Force-close the underlying urllib3 pool. SSEClient.close() only does a
# half-close on the local read side and releases the connection back to
# the pool, which on Python 3.10 leaves the TCP socket open until the
# response object is garbage-collected. The FDv1 Fallback Directive
# requires the Primary Synchronizer to be terminated promptly, so we
# tear down the pool here to send the FIN the server is waiting on.
_close_pool_manager(self._sse_pool)

def stop(self):
"""
Expand All @@ -243,6 +303,10 @@ def stop(self):
self._running = False
if self._sse:
self._sse.close()
# See _close_pool_manager docstring: this is what actually severs the
# TCP connection. ``stop()`` may be called from a different thread than
# the one running ``sync()``; close() is idempotent on the pool.
_close_pool_manager(self._sse_pool)

def _record_stream_init(self, failed: bool):
if self._diagnostic_accumulator and self._connection_attempt_start_time:
Expand Down Expand Up @@ -353,7 +417,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
error=DataSourceErrorInfo(
DataSourceErrorKind.INVALID_DATA, 0, time(), str(error)
),
revert_to_fdv1=False,
fallback_to_fdv1=False,
environment_id=envid,
)
return (update, True)
Expand All @@ -377,7 +441,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
update = Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True,
fallback_to_fdv1=True,
environment_id=envid,
)
self.stop()
Expand All @@ -394,7 +458,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
else DataSourceState.OFF
),
error=error_info,
revert_to_fdv1=False,
fallback_to_fdv1=False,
environment_id=envid,
)

Expand All @@ -416,7 +480,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
error=DataSourceErrorInfo(
DataSourceErrorKind.UNKNOWN, 0, time(), str(error)
),
revert_to_fdv1=False,
fallback_to_fdv1=False,
environment_id=envid,
)
# no stacktrace here because, for a typical connection error, it'll
Expand Down
Loading
Loading