-
Notifications
You must be signed in to change notification settings - Fork 138
Add support for multiple groups, and fetching them #877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
And a way to fetch them.
TODO Add a timeout.
WalkthroughChanges adjust track ownership to clone rather than consume, and update call sites accordingly. Publisher task management now uses a FuturesUnordered pool for concurrent group handlers. Broadcast state was restructured (renamed maps) and 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-lite/src/model/track.rs (1)
86-96: Return value doesn't match documentation.The method documentation states it returns "true if this is the latest group," but the implementation always returns
trueregardless of whether the inserted group has the highest sequence number. If a group with a lower sequence is inserted after a higher one, this would incorrectly returntrue.Suggested fix
pub fn insert_group(&mut self, group: GroupConsumer) -> bool { - self.state.send_if_modified(|state| { + let sequence = group.info.sequence; + self.state.send_if_modified(|state| { assert!(state.closed.is_none()); let now = tokio::time::Instant::now(); state.trim(now); state.groups.push_back((now, group.clone())); state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence)); true - }) + }); + self.state.borrow().max_sequence == Some(sequence) }
🤖 Fix all issues with AI agents
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 219-243: The select currently biases track.next_group() first so
when groups keep arriving the FuturesUnordered "tasks" never get polled and
serve_group futures starve; fix by reordering the tokio::select! branches so the
branch that polls `tasks` (the `true = async { while
tasks.next().await.is_some() {} false }` block) comes before
`track.next_group().transpose()`, leaving the rest (the unreachable!() and else
=> return Ok(())) unchanged; this ensures `tasks` (the `FuturesUnordered` named
`tasks`) is polled regularly and `Self::serve_group(...)` futures make progress
while still accepting new groups and using `priority.insert(...)` as before.
🧹 Nitpick comments (6)
rs/moq-lite/src/model/broadcast.rs (2)
82-89: Consider consistency between consumer and producer map insertions.The method inserts into both
consumersandproducersmaps but only returns whether the producer was unique. If the same track name is inserted twice, the consumer is silently replaced while the return value indicates the producer wasn't unique. This asymmetry might be intentional, but if both maps should stay in sync, consider returning a more informative result or documenting this behavior.
438-439: Minor: Consider using a more deterministic synchronization mechanism.The 1ms sleep for waiting on the cleanup task is acknowledged with a TODO. While functional, this introduces timing-dependent behavior in tests that could be flaky on slower systems.
rs/moq-lite/src/model/track.rs (1)
212-246: Documentation slightly misleading on dropped group behavior.The doc comment states "This can block indefinitely if the requested group is dropped," but the implementation actually returns
Ok(None)once it determines the group was dropped (whendrop_sequence >= sequence). Consider updating the comment to clarify the actual behavior.Suggested doc update
/// Block until the group is available. /// -/// NOTE: This can block indefinitely if the requested group is dropped. +/// NOTE: Returns `None` if the group was already dropped from the cache. +/// May block indefinitely if the group hasn't been produced yet and never will be. pub async fn get_group(&self, sequence: u64) -> Result<Option<GroupConsumer>> {rs/moq-relay/src/web.rs (3)
316-323: Path parsing edge case with empty segments.The path splitting may produce unexpected results with trailing slashes or double slashes. For example,
/fetch/broadcast//trackwould produce["broadcast", "", "track"], and after pop,pathwould be["broadcast", ""], resulting inbroadcast = "broadcast/".Consider using
filter(|s| !s.is_empty())to handle edge cases:Suggested improvement
-let mut path: Vec<&str> = path.split("/").collect(); +let mut path: Vec<&str> = path.split("/").filter(|s| !s.is_empty()).collect();
343-343: Hardcoded timeout duplicates track.rs constant.The 30-second deadline here matches
MAX_CACHEintrack.rs. Consider extracting this to a shared constant or making it configurable to prevent the values from drifting out of sync. This aligns with the PR's TODO about making the delivery timeout configurable.
47-67: Minor: Inconsistent argument ID naming.The HTTP config uses
id = "http-listen"while HTTPS usesid = "web-https-listen". Consider aligning these for consistency (either both withweb-prefix or neither).
TODO add support for delivery timeout so it's not hard-coded to 30 seconds