From 3acd52a6ab862c9ea4113212b1b93e00eab8b90d Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 20 Nov 2025 19:50:52 +0900 Subject: [PATCH 1/5] fix1 --- lib/src/core/room.dart | 17 +++++++++-------- lib/src/participant/remote.dart | 5 ++++- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 477c551c..cad844f1 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -724,14 +724,15 @@ class Room extends DisposableChangeNotifier with EventsEmittable { // Emit connected event emitWhenConnected(ParticipantConnectedEvent(participant: result.participant)); // Emit TrackPublishedEvent for each new track - if (connectionState == ConnectionState.connected) { - for (final pub in result.newPublications) { - final event = TrackPublishedEvent( - participant: result.participant, - publication: pub, - ); - [result.participant.events, events].emit(event); - } + for (final pub in result.newPublications) { + final event = TrackPublishedEvent( + participant: result.participant, + publication: pub, + ); + // Always emit to participant.events (internal, for addSubscribedMediaTrack) + result.participant.events.emit(event); + // Only emit to room events when connected (external, for apps) + emitWhenConnected(event); } _sidToIdentity[info.sid] = info.identity; } else { diff --git a/lib/src/participant/remote.dart b/lib/src/participant/remote.dart index ab4fa7f2..1bc02a8b 100644 --- a/lib/src/participant/remote.dart +++ b/lib/src/participant/remote.dart @@ -278,8 +278,11 @@ class RemoteParticipant extends Participant { participant: this, publication: pub, ); + // Always emit to participant.events (internal, for addSubscribedMediaTrack) + events.emit(event); + // Only emit to room events when connected (external, for apps) if (room.connectionState == ConnectionState.connected) { - [events, room.events].emit(event); + room.events.emit(event); } } From 5a915b7260808a897568543b56593ec9dd30e126 Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 20 Nov 2025 19:58:10 +0900 Subject: [PATCH 2/5] Rename emitWhenConnected to emitIfConnected Replaces all occurrences of emitWhenConnected with emitIfConnected for improved method naming consistency and clarity in the Room class and its extensions. --- lib/src/core/room.dart | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index cad844f1..ccb13568 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -371,7 +371,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } // await publication.updateSubscriptionAllowed(event.allowed); - emitWhenConnected(TrackSubscriptionPermissionChangedEvent( + emitIfConnected(TrackSubscriptionPermissionChangedEvent( participant: participant, publication: publication, state: publication.subscriptionState, @@ -380,10 +380,10 @@ class Room extends DisposableChangeNotifier with EventsEmittable { ..on((event) async { _metadata = event.room.metadata; _roomInfo = event.room; - emitWhenConnected(RoomMetadataChangedEvent(metadata: event.room.metadata)); + emitIfConnected(RoomMetadataChangedEvent(metadata: event.room.metadata)); if (_isRecording != event.room.activeRecording) { _isRecording = event.room.activeRecording; - emitWhenConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); + emitIfConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); } }) ..on((event) async { @@ -416,7 +416,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (_isRecording != event.response.room.activeRecording) { _isRecording = event.response.room.activeRecording; - emitWhenConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); + emitIfConnected(RoomRecordingStatusChanged(activeRecording: _isRecording)); } logger.fine('[Engine] Received JoinResponse, ' @@ -722,7 +722,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (isNew) { hasChanged = true; // Emit connected event - emitWhenConnected(ParticipantConnectedEvent(participant: result.participant)); + emitIfConnected(ParticipantConnectedEvent(participant: result.participant)); // Emit TrackPublishedEvent for each new track for (final pub in result.newPublications) { final event = TrackPublishedEvent( @@ -732,7 +732,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { // Always emit to participant.events (internal, for addSubscribedMediaTrack) result.participant.events.emit(event); // Only emit to room events when connected (external, for apps) - emitWhenConnected(event); + emitIfConnected(event); } _sidToIdentity[info.sid] = info.identity; } else { @@ -770,7 +770,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { final activeSpeakers = lastSpeakers.values.toList(); activeSpeakers.sort((a, b) => b.audioLevel.compareTo(a.audioLevel)); _activeSpeakers = activeSpeakers; - emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); + emitIfConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } // from data channel @@ -803,7 +803,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { } _activeSpeakers = activeSpeakers; - emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); + emitIfConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } void _onSignalConnectionQualityUpdateEvent(List updates) { @@ -837,7 +837,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { if (trackPublication == null) continue; // update the stream state await trackPublication.updateStreamState(update.state.toLKType()); - emitWhenConnected(TrackStreamStateUpdatedEvent( + emitIfConnected(TrackStreamStateUpdatedEvent( participant: participant, publication: trackPublication, streamState: update.state.toLKType(), @@ -907,7 +907,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { await participant.removeAllPublishedTracks(notify: true); - emitWhenConnected(ParticipantDisconnectedEvent(participant: participant)); + emitIfConnected(ParticipantDisconnectedEvent(participant: participant)); return true; } @@ -980,7 +980,7 @@ extension RoomPrivateMethods on Room { } @internal - void emitWhenConnected(RoomEvent event) { + void emitIfConnected(RoomEvent event) { if (connectionState == ConnectionState.connected) { events.emit(event); } From ad54a925c47d38f2994d1c2d2ce61bd9b870e2fc Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Thu, 20 Nov 2025 20:28:43 +0900 Subject: [PATCH 3/5] fix2 --- lib/src/core/room.dart | 15 +++++++++------ lib/src/internal/events.dart | 21 +++++++++++++++++++++ lib/src/participant/remote.dart | 20 ++++++++++++-------- 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index ccb13568..0ad34738 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -725,14 +725,17 @@ class Room extends DisposableChangeNotifier with EventsEmittable { emitIfConnected(ParticipantConnectedEvent(participant: result.participant)); // Emit TrackPublishedEvent for each new track for (final pub in result.newPublications) { - final event = TrackPublishedEvent( + // Always emit internal event (for addSubscribedMediaTrack) + result.participant.events.emit(InternalTrackPublishedEvent( participant: result.participant, publication: pub, - ); - // Always emit to participant.events (internal, for addSubscribedMediaTrack) - result.participant.events.emit(event); - // Only emit to room events when connected (external, for apps) - emitIfConnected(event); + )); + + // Only emit public event when connected (for apps) + emitIfConnected(TrackPublishedEvent( + participant: result.participant, + publication: pub, + )); } _sidToIdentity[info.sid] = info.identity; } else { diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index 71fb5781..00eb8736 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -18,8 +18,10 @@ import 'package:meta/meta.dart'; import '../e2ee/options.dart'; import '../events.dart'; +import '../participant/remote.dart' show RemoteParticipant; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; +import '../publication/remote.dart' show RemoteTrackPublication; import '../track/local/local.dart'; import '../track/options.dart'; import '../track/track.dart'; @@ -306,6 +308,25 @@ class SignalLocalTrackPublishedEvent with SignalEvent, InternalEvent { }); } +/// Internal event for track publication metadata arrival. +/// Used by addSubscribedMediaTrack to wait for publication metadata. +/// This event always fires regardless of connection state. +/// Apps should listen to TrackPublishedEvent instead (only fires when connected). +@internal +class InternalTrackPublishedEvent with ParticipantEvent, InternalEvent { + final RemoteParticipant participant; + final RemoteTrackPublication publication; + + const InternalTrackPublishedEvent({ + required this.participant, + required this.publication, + }); + + @override + String toString() => '${runtimeType}' + '(participant: ${participant}, publication: ${publication})'; +} + @internal class SignalTrackUnpublishedEvent with SignalEvent, InternalEvent { final String trackSid; diff --git a/lib/src/participant/remote.dart b/lib/src/participant/remote.dart index 1bc02a8b..780917b8 100644 --- a/lib/src/participant/remote.dart +++ b/lib/src/participant/remote.dart @@ -189,8 +189,8 @@ class RemoteParticipant extends Participant { if (pub == null) { logger.fine('addSubscribedMediaTrack() pub is null, will wait...'); logger.fine('addSubscribedMediaTrack() tracks: $trackPublications'); - // Wait for the metadata to arrive - final event = await events.waitFor( + // Wait for the metadata to arrive (using internal event) + final event = await events.waitFor( filter: (event) => event.participant == this && event.publication.sid == trackSid, duration: room.connectOptions.timeouts.publish, onTimeout: () => throw TrackSubscriptionExceptionEvent( @@ -274,15 +274,19 @@ class RemoteParticipant extends Participant { // Emit events for new publications for (final pub in newPubs) { - final event = TrackPublishedEvent( + // Always emit internal event (for addSubscribedMediaTrack) + events.emit(InternalTrackPublishedEvent( participant: this, publication: pub, - ); - // Always emit to participant.events (internal, for addSubscribedMediaTrack) - events.emit(event); - // Only emit to room events when connected (external, for apps) + )); + + // Only emit public event when connected (for apps) if (room.connectionState == ConnectionState.connected) { - room.events.emit(event); + final event = TrackPublishedEvent( + participant: this, + publication: pub, + ); + [events, room.events].emit(event); } } From cdd3f5f2df94478a08fa41fb9c289d910aed181b Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Fri, 21 Nov 2025 13:13:02 +0900 Subject: [PATCH 4/5] Create fix-subscribed-event --- .changes/fix-subscribed-event | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changes/fix-subscribed-event diff --git a/.changes/fix-subscribed-event b/.changes/fix-subscribed-event new file mode 100644 index 00000000..26c6c84f --- /dev/null +++ b/.changes/fix-subscribed-event @@ -0,0 +1 @@ +patch type="fixed" "Fix race condition causing track subscriptions to fail when metadata arrives before connection completes" From 36472c02d971fcb072063298f212cb92a4cdf45c Mon Sep 17 00:00:00 2001 From: Hiroshi Horie <548776+hiroshihorie@users.noreply.github.com> Date: Sat, 22 Nov 2025 06:03:09 +0900 Subject: [PATCH 5/5] fix3 --- lib/src/core/room.dart | 47 ++++++++++++++++++++++++++++++------ lib/src/internal/events.dart | 17 +++++++++++++ 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/lib/src/core/room.dart b/lib/src/core/room.dart index 521eaaf0..c2f4454b 100644 --- a/lib/src/core/room.dart +++ b/lib/src/core/room.dart @@ -588,7 +588,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable { trackSid = streamId; } - final participant = _getRemoteParticipantBySid(participantSid); + var participant = _getRemoteParticipantBySid(participantSid); try { if (trackSid == null || trackSid.isEmpty) { throw TrackSubscriptionExceptionEvent( @@ -597,11 +597,26 @@ class Room extends DisposableChangeNotifier with EventsEmittable { ); } if (participant == null) { - throw TrackSubscriptionExceptionEvent( - participant: participant, - sid: trackSid, - reason: TrackSubscribeFailReason.noParticipantFound, - ); + logger.fine('EngineTrackAddedEvent participant is null, waiting for participant metadata...'); + // Wait for participant metadata to arrive + try { + final availableEvent = await events.waitFor( + filter: (event) => event.participantSid == participantSid, + duration: connectOptions.timeouts.publish, + ); + participant = availableEvent.participant; + logger.fine('EngineTrackAddedEvent participant metadata received'); + } catch (e) { + logger.severe('EngineTrackAddedEvent timeout waiting for participant metadata: $e'); + } + + if (participant == null) { + throw TrackSubscriptionExceptionEvent( + participant: participant, + sid: trackSid, + reason: TrackSubscribeFailReason.noParticipantFound, + ); + } } await participant.addSubscribedMediaTrack( event.track, @@ -611,11 +626,11 @@ class Room extends DisposableChangeNotifier with EventsEmittable { audioOutputOptions: roomOptions.defaultAudioOutputOptions, ); } on TrackSubscriptionExceptionEvent catch (event) { - logger.severe('addSubscribedMediaTrack() throwed ${event}'); + logger.severe('Track subscription failed: ${event}'); events.emit(event); } catch (exception) { // We don't want to pass up any exception so catch everything here. - logger.warning('Unknown exception on addSubscribedMediaTrack() ${exception}'); + logger.warning('Unknown exception during track subscription: ${exception}'); } }); @@ -678,6 +693,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable { _remoteParticipants[result.participant.identity] = result.participant; _sidToIdentity[result.participant.sid] = result.participant.identity; + + // Emit internal event for tracks waiting for participant metadata + events.emit(InternalParticipantAvailableEvent( + participant: result.participant, + )); + return result; } @@ -726,10 +747,20 @@ class Room extends DisposableChangeNotifier with EventsEmittable { )); } _sidToIdentity[info.sid] = info.identity; + + // Emit internal event for tracks waiting for participant metadata + events.emit(InternalParticipantAvailableEvent( + participant: result.participant, + )); } else { final wasUpdated = await result.participant.updateFromInfo(info); if (wasUpdated) { _sidToIdentity[info.sid] = info.identity; + + // Emit internal event for tracks waiting for participant metadata + events.emit(InternalParticipantAvailableEvent( + participant: result.participant, + )); } } } diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index ca4ab747..5b2f6c7b 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -420,6 +420,23 @@ class InternalTrackPublishedEvent with ParticipantEvent, InternalEvent { '(participant: ${participant}, publication: ${publication})'; } +/// Internal event fired when a participant becomes available (added to _sidToIdentity map). +/// Used by EngineTrackAddedEvent handler to wait for participant metadata when tracks arrive +/// before participant info is processed from JoinResponse or ParticipantUpdate. +@internal +class InternalParticipantAvailableEvent with RoomEvent, InternalEvent { + final RemoteParticipant participant; + + const InternalParticipantAvailableEvent({ + required this.participant, + }); + + String get participantSid => participant.sid; + + @override + String toString() => '${runtimeType}(participant: ${participant.sid})'; +} + @internal class SignalTrackUnpublishedEvent with SignalEvent, InternalEvent { final String trackSid;