Skip to content

Commit a9913af

Browse files
committed
FIX: Fix threadsafety in Live client
1 parent 7f6302c commit a9913af

4 files changed

Lines changed: 23 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
downloading a batch job as a ZIP archive
99
- Added method for `batch.get_job_details` to access the full details of a batch job
1010

11+
#### Bug fixes
12+
- Fixed some thread-unsafe behavior in `Live.start()` and `Live.terminate()` which would
13+
call methods from the client's event loop in the main thread
14+
1115
## 0.78.0 - 2026-05-12
1216

1317
#### Enhancements

databento/live/session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ def start(self) -> None:
517517
with self._lock:
518518
if self._protocol is None:
519519
raise ValueError("session is not connected")
520-
self._protocol.start()
520+
self._loop.call_soon_threadsafe(self._protocol.start)
521521
self._heartbeat_monitor_task = self._loop.create_task(
522522
self._heartbeat_monitor(),
523523
)
@@ -585,7 +585,7 @@ def terminate(self) -> None:
585585
with self._lock:
586586
if self._transport is None:
587587
return
588-
self._transport.abort()
588+
self._loop.call_soon_threadsafe(self._transport.abort)
589589
self._cleanup()
590590

591591
async def wait_for_close(self) -> None:

tests/test_live_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,7 @@ async def test_live_start_twice(
465465

466466
# Act, Assert
467467
live_client.start()
468+
await mock_live_server.wait_for_message_of_type(message_type=gateway.SessionStart)
468469
with pytest.raises(ValueError):
469470
live_client.start()
470471

tests/test_live_client_reconnect.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ async def test_reconnect_policy_none(
4545
stype_in=SType.RAW_SYMBOL,
4646
symbols="TEST",
4747
)
48-
live_client._session._protocol.disconnected.set_exception(ConnectionResetError)
48+
live_client._loop.call_soon_threadsafe(
49+
live_client._session._protocol.disconnected.set_exception,
50+
ConnectionResetError,
51+
)
4952

5053
await mock_live_server.wait_for_message_of_type(AuthenticationRequest)
5154

@@ -90,7 +93,10 @@ async def test_reconnect_before_start(
9093
live_client.add_callback(records.append)
9194

9295
# Act
93-
live_client._session._protocol.disconnected.set_exception(ConnectionResetError)
96+
live_client._loop.call_soon_threadsafe(
97+
live_client._session._protocol.disconnected.set_exception,
98+
ConnectionResetError,
99+
)
94100
await mock_live_server.disconnect(
95101
session_id=live_client._session.session_id,
96102
)
@@ -162,7 +168,10 @@ async def test_reconnect_subscriptions(
162168
await mock_live_server.wait_for_message_of_type(AuthenticationRequest)
163169

164170
# Act
165-
live_client._session._protocol.disconnected.set_exception(ConnectionResetError)
171+
live_client._loop.call_soon_threadsafe(
172+
live_client._session._protocol.disconnected.set_exception,
173+
ConnectionResetError,
174+
)
166175
await mock_live_server.disconnect(
167176
session_id=live_client._session.session_id,
168177
)
@@ -217,8 +226,11 @@ async def test_reconnect_callback(
217226

218227
# Act
219228
live_client.start()
220-
live_client._session._protocol.disconnected.set_exception(ConnectionResetError)
221229

230+
live_client._loop.call_soon_threadsafe(
231+
live_client._session._protocol.disconnected.set_exception,
232+
ConnectionResetError,
233+
)
222234
await mock_live_server.wait_for_message_of_type(SessionStart)
223235

224236
await live_client.wait_for_close()

0 commit comments

Comments
 (0)