Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions ably/transport/websockettransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def __init__(self, connection_manager: ConnectionManager, host: str, params: dic
def connect(self):
headers = HttpUtils.default_headers()
query_params = urllib.parse.urlencode(self.params)
ws_url = (f'wss://{self.host}?{query_params}')
scheme = 'wss' if self.options.tls else 'ws'
ws_url = f'{scheme}://{self.host}?{query_params}'
log.info(f'connect(): attempting to connect to {ws_url}')
self.ws_connect_task = asyncio.create_task(self.ws_connect(ws_url, headers))
self.ws_connect_task.add_done_callback(self.on_ws_connect_done)
Expand Down Expand Up @@ -124,6 +125,11 @@ async def _handle_websocket_connection(self, ws_url, websocket):
if not self.is_disposed:
await self.dispose()
self.connection_manager.deactivate_transport(err)
else:
# Read loop exited normally (e.g., server sent normal WS close frame)
if not self.is_disposed:
await self.dispose()
self.connection_manager.deactivate_transport()

async def on_protocol_message(self, msg):
self.on_activity()
Expand Down Expand Up @@ -284,8 +290,9 @@ async def send(self, message: dict):
await self.websocket.send(raw_msg)

def set_idle_timer(self, timeout: float):
if not self.idle_timer:
self.idle_timer = Timer(timeout, self.on_idle_timer_expire)
if self.idle_timer:
self.idle_timer.cancel()
self.idle_timer = Timer(timeout, self.on_idle_timer_expire)

async def on_idle_timer_expire(self):
self.idle_timer = None
Expand Down
13 changes: 10 additions & 3 deletions test/ably/realtime/realtimechannelmutablemessages_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ async def test_append_message_with_string_data(self):

def on_message(message):
messages_received.append(message)
append_received.finish()
if len(messages_received) == 2:
append_received.finish()

await channel.subscribe(on_message)

Expand All @@ -254,15 +255,21 @@ def on_message(message):
channel, serial, MessageAction.MESSAGE_UPDATE
)

second_append_result = await channel.append_message(append_message, append_operation)

await append_received.wait()

assert messages_received[0].data == ' appended data'
assert messages_received[0].action == MessageAction.MESSAGE_APPEND
assert messages_received[0].data == 'Initial data appended data'
assert messages_received[0].action == MessageAction.MESSAGE_UPDATE
assert appended_message.data == 'Initial data appended data'
assert appended_message.version.serial == append_result.version_serial
assert appended_message.version.description == 'Appended to message'
assert appended_message.serial == serial

assert messages_received[1].data == ' appended data'
assert messages_received[1].action == MessageAction.MESSAGE_APPEND
assert messages_received[1].version.serial == second_append_result.version_serial

async def wait_until_message_with_action_appears(self, channel, serial, action):
message: Message | None = None
async def check_message_action():
Expand Down
104 changes: 104 additions & 0 deletions test/ably/realtime/realtimeconnection_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import asyncio

import pytest
from websockets import connect as _ws_connect

try:
# websockets 15+ preferred import
from websockets.asyncio.server import serve as ws_serve
except ImportError:
# websockets 14 and earlier fallback
from websockets.server import serve as ws_serve

from ably.realtime.connection import ConnectionEvent, ConnectionState
from ably.transport.defaults import Defaults
Expand All @@ -10,6 +18,68 @@
from test.ably.utils import BaseAsyncTestCase


async def _relay(src, dst):
try:
async for msg in src:
await dst.send(msg)
except Exception:
pass


class WsProxy:
"""Local WS proxy that forwards to real Ably and lets tests trigger a normal close."""

def __init__(self, target_host: str):
self.target_host = target_host
self.server = None
self.port: int | None = None
self._close_event: asyncio.Event | None = None

async def _handler(self, client_ws):
# Create a fresh event for this connection; signal to drop the connection cleanly
self._close_event = asyncio.Event()
path = client_ws.request.path # e.g. "/?key=...&format=json"
target_url = f"wss://{self.target_host}{path}"
try:
async with _ws_connect(target_url, ping_interval=None) as server_ws:
c2s = asyncio.create_task(_relay(client_ws, server_ws))
s2c = asyncio.create_task(_relay(server_ws, client_ws))
close_task = asyncio.create_task(self._close_event.wait())
try:
await asyncio.wait([c2s, s2c, close_task], return_when=asyncio.FIRST_COMPLETED)
finally:
c2s.cancel()
s2c.cancel()
close_task.cancel()
except Exception:
pass
# After _handler returns the websockets server sends a normal close frame (1000)

async def close_active_connection(self):
"""Trigger a normal WS close (code 1000) on the currently active client connection.

Signals the handler to exit; the websockets server framework then sends the
close frame automatically when the handler coroutine returns.
"""
if self._close_event:
self._close_event.set()

@property
def endpoint(self) -> str:
"""Endpoint string to pass to AblyRealtime (combine with tls=False)."""
return f"127.0.0.1:{self.port}"

async def __aenter__(self):
self.server = await ws_serve(self._handler, "127.0.0.1", 0, ping_interval=None)
self.port = self.server.sockets[0].getsockname()[1]
return self

async def __aexit__(self, *args):
if self.server:
self.server.close()
await self.server.wait_closed()


class TestRealtimeConnection(BaseAsyncTestCase):
@pytest.fixture(autouse=True)
async def setup(self):
Expand Down Expand Up @@ -469,3 +539,37 @@ async def test_queue_messages_defaults_to_true(self):
# TO3g: queueMessages defaults to true
assert ably.options.queue_messages is True
assert ably.connection.connection_manager.options.queue_messages is True

async def test_normal_ws_close_triggers_immediate_reconnection(self):
"""Server normal WS close (code 1000) must trigger immediate reconnection.

Regression test: ConnectionClosedOK was silently swallowed and deactivate_transport
was never called, leaving the client disconnected until the idle timer fired.
"""
async with WsProxy(self.test_vars["host"]) as proxy:
ably = await TestApp.get_ably_realtime(
disconnected_retry_timeout=500_000,
suspended_retry_timeout=500_000,
tls=False,
endpoint=proxy.endpoint,
)

try:
await asyncio.wait_for(
ably.connection.once_async(ConnectionState.CONNECTED), timeout=10
)

# Simulate server sending a normal WS close frame
await proxy.close_active_connection()

# Must go CONNECTING quickly — not after the 25 s idle timer
await asyncio.wait_for(
ably.connection.once_async(ConnectionState.CONNECTING), timeout=1
)

# Must reconnect immediately — not after the 500 s retry timer
await asyncio.wait_for(
ably.connection.once_async(ConnectionState.CONNECTED), timeout=10
)
finally:
await ably.close()
Loading