-
Notifications
You must be signed in to change notification settings - Fork 138
CMAF passthrough attempt v3 #867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This change adds support for CMAF (fMP4) passthrough mode, where moof+mdat
fragments are sent directly to the player without parsing individual samples.
This enables lower latency and CPU usage when players support CMAF.
Catalog changes:
- Add Container type as discriminated union: { kind: "legacy" } | { kind: "cmaf", timescale, trackId }
- Add minBuffer field to VideoConfig and AudioConfig for buffering hints
CLI changes:
- Add --passthrough flag for fmp4 and hls subcommands
- hang fmp4 --passthrough: Pass through fMP4 fragments directly
- hang hls --passthrough: Pass through HLS fMP4 segments directly
Rust import changes:
- Fmp4::new() now takes Fmp4Config with passthrough option
- HlsConfig now has passthrough option
- Add decode_from() async method to Avc3 and Fmp4 importers
- Passthrough mode writes moof+mdat as single frame per track
- Automatically compute min_buffer from fragment duration
TypeScript changes:
- Add mp4 module with decodeDataSegment() for parsing fMP4 fragments
- Update video/audio sources to handle CMAF container type
- Decode moof+mdat fragments into EncodedVideoChunk/EncodedAudioChunk
- Rename "latency" to "buffer" in source props and element attributes
- Add Latency utility: effective latency = catalog.minBuffer + buffer
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughThe changes rename and rewire latency into a user-controlled buffer plus a computed latency (catalog minBuffer + buffer) via a new Latency helper and signal plumbing across UI, watch, audio, and video paths. The container model moves from a simple enum to a discriminated Container (legacy|cmaf) with CMAF parsing/rewriting added (JS cmaf module) and container-aware producers/consumers replacing prior Frame paths. Catalog schemas (audio/video) and Rust types gain container and min_buffer fields; fMP4/CMAF passthrough, decoder streaming, and dependency updates are included. 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
js/hang/src/watch/audio/source.ts (1)
141-146: Buffer changes won’t propagate to audio latency after init.
#latency.peek()is captured once for the worklet init and for the consumer’s latency. Since the effects don’t depend on#latency, changes from the buffer slider won’t take effect (and can desync audio vs. video, which does read latency per frame). Consider making the effect depend on#latency.combined, or add a worklet message to update latency dynamically.✅ One option: make the effects react to latency changes
const sampleRate = config.sampleRate; const channelCount = config.numberOfChannels; + const latency = effect.get(this.#latency.combined); const init: Render.Init = { type: "init", rate: sampleRate, channels: channelCount, - latency: this.#latency.peek(), // TODO make it reactive + latency, };`#runLegacyDecoder`(effect: Effect, sub: Moq.Track, config: Catalog.AudioConfig): void { + const latency = effect.get(this.#latency.combined); // Create consumer with slightly less latency than the render worklet to avoid underflowing. const consumer = new Frame.Consumer(sub, { - latency: Math.max(this.#latency.peek() - JITTER_UNDERHEAD, 0) as Time.Milli, + latency: Math.max(latency - JITTER_UNDERHEAD, 0) as Time.Milli, });Also applies to: 191-195
rs/hang/src/import/hls.rs (1)
181-202: Avoid re-fetching the playlist for init; use the same playlist that selected the segment.Line 476-482:
push_segmentrefetches the playlist to obtain EXT‑X‑MAP wheninit_readyis false. That can select a different init segment than the one associated with the already-chosen segment list. It also adds extra network I/O. Prefer ensuring init once insideconsume_segments_limitedusing the playlist already in hand, then letpush_segmentassume init is ready.🛠️ Suggested fix
async fn consume_segments_limited( &mut self, kind: TrackKind, track: &mut TrackState, playlist: &MediaPlaylist, max_segments: usize, ) -> anyhow::Result<usize> { + self.ensure_init_segment(kind, track, playlist).await?; + // Calculate segments to process let next_seq = track.next_sequence.unwrap_or(0); let playlist_seq = playlist.media_sequence; let total_segments = playlist.segments.len(); @@ async fn push_segment( &mut self, kind: TrackKind, track: &mut TrackState, segment: &MediaSegment, sequence: u64, ) -> anyhow::Result<()> { @@ - // Ensure the importer is initialized before processing fragments - // Use track.init_ready to avoid borrowing issues - if !track.init_ready { - // Try to ensure init segment is processed - let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; - self.ensure_init_segment(kind, track, &playlist).await?; - } + if !track.init_ready { + anyhow::bail!("init segment not processed before media segments"); + }Also applies to: 321-363, 476-482
🤖 Fix all issues with AI agents
In `@js/hang/src/mp4/decode.ts`:
- Around line 243-262: The loop over trun.samples can produce zero-length
samples or read past mdatData because sampleSize/sampleDuration may default to 0
and there's no bounds check on dataOffset; update the TrackRunSample parsing in
the function handling trun (use symbols trun, sample, sampleSize, defaultSize,
sampleDuration, defaultDuration, dataOffset, mdatData) to validate that
sampleSize and sampleDuration are > 0 (or skip/throw for invalid samples),
ensure before slicing that dataOffset + sampleSize <= mdatData.length and raise
an error if it would overflow, and when skipping/throwing make sure dataOffset
is advanced appropriately or processing for this trun aborts to avoid an
infinite loop/emission of invalid Encoded*Chunk; also confirm any
trun.dataOffset semantics from readMdat() are respected when computing
dataOffset.
In `@js/hang/src/util/latency.ts`:
- Around line 39-45: The current logic treats an explicit minBuffer = 0 as
missing because it uses a truthy check; change the checks to use nullish checks
and validate framerate: when initializing minBuffer (the variable named
minBuffer and the config.framerate usage), only treat config.minBuffer as
missing if it is null or undefined (use a nullish check) and only estimate
minBuffer from config.framerate if framerate is a finite positive number (e.g.,
> 0); likewise update the similar block referenced around the other occurrence
(lines 47-48) so you do not override an explicit 0 and you guard against invalid
framerate values.
In `@js/hang/src/watch/element.ts`:
- Around line 203-205: The buffer setter currently does
this.buffer.set((newValue ? Number.parseFloat(newValue) : 100) as Time.Milli)
and can accept NaN or negative numbers; change it to parse newValue with
Number.parseFloat, validate that the result is a finite number and >= 0 (or > 0
if zero is invalid), and if not, fall back to the default (100); then call
this.buffer.set(validatedValue as Time.Milli). Ensure you reference the existing
check for name === "buffer" and the this.buffer.set call when making the change.
In `@rs/hang/src/catalog/audio/mod.rs`:
- Around line 66-78: Add serde default handling for the container field so old
catalogs deserialize with the legacy value: add #[serde(default)] to the pub
container: Container field and ensure a default is provided (either implement
Default for the Container enum returning Container::Legacy or add a small
function like default_container() -> Container that returns Container::Legacy
and use #[serde(default = "default_container")]). Update the Container type to
provide that Default/func so deserialization falls back to Container::Legacy.
In `@rs/hang/src/catalog/video/mod.rs`:
- Around line 115-127: The struct's `container: Container` field lacks a serde
default, so deserializing older catalogs without this field will fail; add a
serde default for `container` (either `#[serde(default)]` and implement `Default
for Container` returning `Container::Legacy`, or use `#[serde(default =
"default_container")]` with a `fn default_container() -> Container {
Container::Legacy }`) and ensure the default value is `Container::Legacy`.
In `@rs/hang/src/import/avc3.rs`:
- Around line 118-125: The decode_from function currently reads into a BytesMut
buffer and repeatedly calls decode_stream, but never flushes the final buffered
NAL when EOF is reached; after the read loop returns 0, if buffer is non-empty
call the appropriate flush path (e.g., invoke self.decode_frame(&mut buffer) or
call self.decode_stream(&mut buffer, Some(final_end_flag)) depending on the
decoder API) so the trailing NAL is processed; update decode_from to check
buffer.is_empty() after the loop and call the decoder's flush method
(referencing decode_from, decode_stream, and decode_frame) before returning
Ok(()).
In `@rs/hang/src/import/fmp4.rs`:
- Around line 497-503: The calculation for offset using `offset = base_offset +
data_offset - moof_size - header_size` can underflow when `data_offset` is
smaller than `moof_size + header_size`; change this to validate and use checked
arithmetic: ensure `data_offset >= moof_size + header_size` (or use
`checked_sub`) before computing, convert and use `checked_add`/`checked_sub` as
needed when combining `base_offset`, `data_offset`, `moof_size`, and
`header_size`, and return an error (anyhow::bail!) if any checked operation
returns None to prevent unsigned underflow in the `offset` assignment
(referencing variables `offset`, `base_offset`, `data_offset`, `moof_size`,
`header_size`).
🧹 Nitpick comments (3)
js/hang/package.json (1)
37-37: Note: Alpha version dependency.
@svta/cml-iso-bmffis pinned to an alpha version (^1.0.0-alpha.9). Alpha releases may introduce breaking API changes. Consider pinning to an exact version (1.0.0-alpha.9without^) to avoid unexpected breakage, or document the rationale for using an unstable release.js/hang/src/watch/audio/source.ts (1)
257-289: Process CMAF groups sequentially to preserve ordering/backpressure.
Spawning a task per group can interleave samples across groups and create unbounded concurrency under high segment rates. A sequential loop preserves chunk order and keeps resource usage predictable.♻️ Suggested sequential group processing
for (;;) { const group = await sub.nextGroup(); if (!group) break; - effect.spawn(async () => { - try { - for (;;) { - const segment = await group.readFrame(); - if (!segment) break; - const samples = Mp4.decodeDataSegment(segment, timescale); - for (const sample of samples) { - this.#stats.update((stats) => ({ - bytesReceived: (stats?.bytesReceived ?? 0) + sample.data.byteLength, - })); - const chunk = new EncodedAudioChunk({ - type: sample.keyframe ? "key" : "delta", - data: sample.data, - timestamp: sample.timestamp, - }); - decoder.decode(chunk); - } - } - } finally { - group.close(); - } - }); + try { + for (;;) { + const segment = await group.readFrame(); + if (!segment) break; + const samples = Mp4.decodeDataSegment(segment, timescale); + for (const sample of samples) { + this.#stats.update((stats) => ({ + bytesReceived: (stats?.bytesReceived ?? 0) + sample.data.byteLength, + })); + const chunk = new EncodedAudioChunk({ + type: sample.keyframe ? "key" : "delta", + data: sample.data, + timestamp: sample.timestamp, + }); + decoder.decode(chunk); + } + } + } finally { + group.close(); + } }js/hang/src/watch/video/source.ts (1)
386-423: Process CMAF groups sequentially to preserve ordering/backpressure.
The per-group spawn can interleave samples across groups and grow concurrency unexpectedly. A sequential loop keeps timestamps ordered and avoids resource spikes.♻️ Suggested sequential group processing
for (;;) { const group = await sub.nextGroup(); if (!group) break; - effect.spawn(async () => { - try { - for (;;) { - const segment = await group.readFrame(); - if (!segment) break; - const samples = Mp4.decodeDataSegment(segment, timescale); - for (const sample of samples) { - const chunk = new EncodedVideoChunk({ - type: sample.keyframe ? "key" : "delta", - data: sample.data, - timestamp: sample.timestamp, - }); - this.#stats.update((current) => ({ - frameCount: (current?.frameCount ?? 0) + 1, - timestamp: sample.timestamp, - bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength, - })); - decoder.decode(chunk); - } - } - } finally { - group.close(); - } - }); + try { + for (;;) { + const segment = await group.readFrame(); + if (!segment) break; + const samples = Mp4.decodeDataSegment(segment, timescale); + for (const sample of samples) { + const chunk = new EncodedVideoChunk({ + type: sample.keyframe ? "key" : "delta", + data: sample.data, + timestamp: sample.timestamp, + }); + this.#stats.update((current) => ({ + frameCount: (current?.frameCount ?? 0) + 1, + timestamp: sample.timestamp, + bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength, + })); + decoder.decode(chunk); + } + } + } finally { + group.close(); + } }
| } else if (name === "buffer") { | ||
| this.buffer.set((newValue ? Number.parseFloat(newValue) : 100) as Time.Milli); | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against NaN/negative buffer values.
Parsing user input can yield NaN or negative values, which then flow into timing. Consider sanitizing before setting.
🛠️ Proposed fix
} else if (name === "buffer") {
- this.buffer.set((newValue ? Number.parseFloat(newValue) : 100) as Time.Milli);
+ const parsed = newValue ? Number.parseFloat(newValue) : 100;
+ const safe = Number.isFinite(parsed) && parsed >= 0 ? parsed : 100;
+ this.buffer.set(safe as Time.Milli);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } else if (name === "buffer") { | |
| this.buffer.set((newValue ? Number.parseFloat(newValue) : 100) as Time.Milli); | |
| } else { | |
| } else if (name === "buffer") { | |
| const parsed = newValue ? Number.parseFloat(newValue) : 100; | |
| const safe = Number.isFinite(parsed) && parsed >= 0 ? parsed : 100; | |
| this.buffer.set(safe as Time.Milli); | |
| } else { |
🤖 Prompt for AI Agents
In `@js/hang/src/watch/element.ts` around lines 203 - 205, The buffer setter
currently does this.buffer.set((newValue ? Number.parseFloat(newValue) : 100) as
Time.Milli) and can accept NaN or negative numbers; change it to parse newValue
with Number.parseFloat, validate that the result is a finite number and >= 0 (or
> 0 if zero is invalid), and if not, fall back to the default (100); then call
this.buffer.set(validatedValue as Time.Milli). Ensure you reference the existing
check for name === "buffer" and the this.buffer.set call when making the change.
| /// Container format for frame encoding. | ||
| /// Defaults to "legacy" for backward compatibility. | ||
| pub container: Container, | ||
|
|
||
| /// Minimum buffer size in milliseconds required for smooth playback. | ||
| /// | ||
| /// This represents the minimum time the player should buffer before starting playback. | ||
| /// For HLS imports, this is typically the segment duration. | ||
| /// For fMP4 imports, this is detected from the fragment duration. | ||
| /// | ||
| /// The player should add additional jitter buffer on top of this value. | ||
| #[serde(default)] | ||
| pub min_buffer: Option<moq_lite::Time>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a serde default for container to preserve backward compatibility.
Line 66-68: the doc states a legacy default, but without #[serde(default)] older catalogs won’t deserialize. Please add a default and ensure Container::Legacy is the default value.
🛠️ Suggested fix
/// Container format for frame encoding.
/// Defaults to "legacy" for backward compatibility.
+ #[serde(default)]
pub container: Container,🤖 Prompt for AI Agents
In `@rs/hang/src/catalog/audio/mod.rs` around lines 66 - 78, Add serde default
handling for the container field so old catalogs deserialize with the legacy
value: add #[serde(default)] to the pub container: Container field and ensure a
default is provided (either implement Default for the Container enum returning
Container::Legacy or add a small function like default_container() -> Container
that returns Container::Legacy and use #[serde(default = "default_container")]).
Update the Container type to provide that Default/func so deserialization falls
back to Container::Legacy.
| /// Container format for frame encoding. | ||
| /// Defaults to "legacy" for backward compatibility. | ||
| pub container: Container, | ||
|
|
||
| /// Minimum buffer size in milliseconds required for smooth playback. | ||
| /// | ||
| /// This represents the minimum time the player should buffer before starting playback. | ||
| /// For HLS imports, this is typically the segment duration. | ||
| /// For fMP4 imports, this is detected from the fragment duration. | ||
| /// | ||
| /// The player should add additional jitter buffer on top of this value. | ||
| #[serde(default)] | ||
| pub min_buffer: Option<moq_lite::Time>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a serde default for container to preserve backward compatibility.
Line 115-117: the doc says “Defaults to legacy,” but without #[serde(default)] older catalogs lacking this field will fail to deserialize. Please add a default and ensure Container::Legacy is the default value.
🛠️ Suggested fix
/// Container format for frame encoding.
/// Defaults to "legacy" for backward compatibility.
+ #[serde(default)]
pub container: Container,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Container format for frame encoding. | |
| /// Defaults to "legacy" for backward compatibility. | |
| pub container: Container, | |
| /// Minimum buffer size in milliseconds required for smooth playback. | |
| /// | |
| /// This represents the minimum time the player should buffer before starting playback. | |
| /// For HLS imports, this is typically the segment duration. | |
| /// For fMP4 imports, this is detected from the fragment duration. | |
| /// | |
| /// The player should add additional jitter buffer on top of this value. | |
| #[serde(default)] | |
| pub min_buffer: Option<moq_lite::Time>, | |
| /// Container format for frame encoding. | |
| /// Defaults to "legacy" for backward compatibility. | |
| #[serde(default)] | |
| pub container: Container, | |
| /// Minimum buffer size in milliseconds required for smooth playback. | |
| /// | |
| /// This represents the minimum time the player should buffer before starting playback. | |
| /// For HLS imports, this is typically the segment duration. | |
| /// For fMP4 imports, this is detected from the fragment duration. | |
| /// | |
| /// The player should add additional jitter buffer on top of this value. | |
| #[serde(default)] | |
| pub min_buffer: Option<moq_lite::Time>, |
🤖 Prompt for AI Agents
In `@rs/hang/src/catalog/video/mod.rs` around lines 115 - 127, The struct's
`container: Container` field lacks a serde default, so deserializing older
catalogs without this field will fail; add a serde default for `container`
(either `#[serde(default)]` and implement `Default for Container` returning
`Container::Legacy`, or use `#[serde(default = "default_container")]` with a `fn
default_container() -> Container { Container::Legacy }`) and ensure the default
value is `Container::Legacy`.
| /// Decode from an asynchronous reader. | ||
| pub async fn decode_from<T: AsyncRead + Unpin>(&mut self, reader: &mut T) -> anyhow::Result<()> { | ||
| let mut buffer = BytesMut::new(); | ||
| while reader.read_buf(&mut buffer).await? > 0 { | ||
| self.decode_stream(&mut buffer, None)?; | ||
| } | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flush the trailing buffer on EOF to avoid dropping the final frame.
Line 118-125: decode_stream needs the next start code, so the final buffered NAL may never flush when the reader ends. Consider flushing the remaining buffer with decode_frame after the read loop.
🛠️ Suggested fix
pub async fn decode_from<T: AsyncRead + Unpin>(&mut self, reader: &mut T) -> anyhow::Result<()> {
let mut buffer = BytesMut::new();
while reader.read_buf(&mut buffer).await? > 0 {
self.decode_stream(&mut buffer, None)?;
}
+
+ if !buffer.is_empty() {
+ self.decode_frame(&mut buffer, None)?;
+ }
Ok(())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Decode from an asynchronous reader. | |
| pub async fn decode_from<T: AsyncRead + Unpin>(&mut self, reader: &mut T) -> anyhow::Result<()> { | |
| let mut buffer = BytesMut::new(); | |
| while reader.read_buf(&mut buffer).await? > 0 { | |
| self.decode_stream(&mut buffer, None)?; | |
| } | |
| Ok(()) | |
| pub async fn decode_from<T: AsyncRead + Unpin>(&mut self, reader: &mut T) -> anyhow::Result<()> { | |
| let mut buffer = BytesMut::new(); | |
| while reader.read_buf(&mut buffer).await? > 0 { | |
| self.decode_stream(&mut buffer, None)?; | |
| } | |
| if !buffer.is_empty() { | |
| self.decode_frame(&mut buffer, None)?; | |
| } | |
| Ok(()) | |
| } |
🤖 Prompt for AI Agents
In `@rs/hang/src/import/avc3.rs` around lines 118 - 125, The decode_from function
currently reads into a BytesMut buffer and repeatedly calls decode_stream, but
never flushes the final buffered NAL when EOF is reached; after the read loop
returns 0, if buffer is non-empty call the appropriate flush path (e.g., invoke
self.decode_frame(&mut buffer) or call self.decode_stream(&mut buffer,
Some(final_end_flag)) depending on the decoder API) so the trailing NAL is
processed; update decode_from to check buffer.is_empty() after the loop and call
the decoder's flush method (referencing decode_from, decode_stream, and
decode_frame) before returning Ok(()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
js/hang/src/watch/audio/source.ts (1)
140-145: Stale worklet latency if buffer changes after init.
latencyis sent once during init; subsequent buffer updates won’t be reflected in the worklet. If buffer is user-adjustable, consider pushing updates or reinitializing the worklet to avoid mismatched jitter handling.
🤖 Fix all issues with AI agents
In `@js/hang/src/watch/audio/source.ts`:
- Around line 238-287: The current code spawns a new task for every group
(effect.spawn inside the for(;;) loop) which allows unbounded concurrency and
can reorder samples; instead process groups sequentially (or implement bounded
backpressure) by removing the inner effect.spawn and awaiting each group's
processing inline: read frames from group.readFrame(), decode with
Container.Cmaf.decodeDataSegment and feed EncodedAudioChunk to decoder.decode
within the same loop, ensure group.close() runs in finally, and keep the
existing decoder lifecycle (decoder.configure, effect.cleanup) intact so groups
are handled one-by-one and ordering/backpressure is preserved.
In `@js/hang/src/watch/video/source.ts`:
- Around line 385-420: The current loop spawns an unbounded effect.spawn per
group (using effect.spawn(...) around the inner group loop), which can grow
concurrency and interleave samples; change to process each group sequentially by
removing the per-group effect.spawn and awaiting/handling the group's readFrame
loop inline after const group = await sub.nextGroup(), keeping the existing
try/finally with group.close(), so Container.Cmaf.decodeDataSegment,
decoder.decode, and this.#stats.update run in-order per group and do not create
unbounded concurrent tasks.
🧹 Nitpick comments (3)
rs/hang/src/import/hls.rs (3)
332-357: Warning is misleading on first playlist fetch.When
next_sequenceisNone(initial state), it defaults to0viaunwrap_or(0). For live streams wheremedia_sequenceis typically non-zero (e.g., 1000+), this always triggers the "behind playlist" warning on first fetch, even though this is expected behavior.Consider differentiating initial sync from an actual sequence gap:
Suggested fix
- let next_seq = track.next_sequence.unwrap_or(0); + let is_initial = track.next_sequence.is_none(); + let next_seq = track.next_sequence.unwrap_or(playlist_seq); let playlist_seq = playlist.media_sequence; let total_segments = playlist.segments.len(); let last_playlist_seq = playlist_seq + total_segments as u64; let skip = if next_seq > last_playlist_seq { warn!( ?kind, next_sequence = next_seq, playlist_sequence = playlist_seq, last_playlist_sequence = last_playlist_seq, "imported ahead of playlist, waiting for new segments" ); total_segments } else if next_seq < playlist_seq { - warn!( - ?kind, - next_sequence = next_seq, - playlist_sequence = playlist_seq, - "next_sequence behind playlist, resetting to start of playlist" - ); - track.next_sequence = None; + if is_initial { + debug!( + ?kind, + playlist_sequence = playlist_seq, + "initial sync to playlist start" + ); + } else { + warn!( + ?kind, + next_sequence = next_seq, + playlist_sequence = playlist_seq, + "next_sequence behind playlist, resetting to start of playlist" + ); + } + track.next_sequence = Some(playlist_seq); 0 } else { (next_seq - playlist_seq) as usize };
432-452: Redundant playlist fetch may mask errors.
consume_segmentsalready callsensure_init_segmentat line 330 before processing any segments. Iftrack.init_readyis false here, it meansensure_init_segmenteither failed (which should have propagated an error) or has a bug.Re-fetching the playlist silently masks the root cause and adds unnecessary network overhead. Consider either:
- Removing this check if the invariant is guaranteed by the call flow, or
- Converting to a debug assertion to catch logic errors during development
Suggested simplification
- // Ensure the importer is initialized before processing fragments - // Use track.init_ready to avoid borrowing issues - if !track.init_ready { - // Try to ensure init segment is processed - let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; - self.ensure_init_segment(kind, track, &playlist).await?; - } - - // Get importer after ensuring init segment + // The init segment should already be loaded by consume_segments + debug_assert!(track.init_ready, "push_segment called before init segment was loaded"); + let importer = match kind { TrackKind::Video(index) => self.ensure_video_importer_for(index), TrackKind::Audio => self.ensure_audio_importer(), }; - // Final check after ensuring init segment if !importer.is_initialized() { return Err(anyhow::anyhow!( - "importer not initialized for {:?} after ensure_init_segment - init segment processing failed", + "importer not initialized for {:?} - init segment was not properly loaded", kind )); }
365-380: Consider demoting per-track consumption logs to debug level.These
info!logs fire for each track (video + audio) on every ingest step, which could generate significant log noise in production. The step-level log at lines 165-169 already summarizes the outcome.Suggested change
- info!( + debug!( ?kind, playlist_sequence = playlist_seq, next_sequence = next_seq, skip = skip, total_segments = total_segments, to_process = to_process, "consuming HLS segments" ); if to_process > 0 { let base_seq = playlist_seq + skip as u64; for (i, segment) in playlist.segments[skip..skip + to_process].iter().enumerate() { self.push_segment(kind, track, segment, base_seq + i as u64).await?; } - info!(?kind, consumed = to_process, "consumed HLS segments"); + debug!(?kind, consumed = to_process, "consumed HLS segments"); } else { debug!(?kind, "no fresh HLS segments available"); }
| effect.spawn(async () => { | ||
| const loaded = await libav.polyfill(); | ||
| if (!loaded) return; // cancelled | ||
|
|
||
| const decoder = new AudioDecoder({ | ||
| output: (data) => this.#emit(data), | ||
| error: (error) => console.error(error), | ||
| }); | ||
| effect.cleanup(() => decoder.close()); | ||
|
|
||
| // Configure decoder with description from catalog | ||
| decoder.configure({ | ||
| codec: config.codec, | ||
| sampleRate: config.sampleRate, | ||
| numberOfChannels: config.numberOfChannels, | ||
| description, | ||
| }); | ||
|
|
||
| // Process data segments | ||
| // TODO: Use a consumer wrapper for CMAF to support latency control | ||
| for (;;) { | ||
| const group = await sub.nextGroup(); | ||
| if (!group) break; | ||
|
|
||
| effect.spawn(async () => { | ||
| try { | ||
| for (;;) { | ||
| const segment = await group.readFrame(); | ||
| if (!segment) break; | ||
|
|
||
| const samples = Container.Cmaf.decodeDataSegment(segment, timescale); | ||
|
|
||
| for (const sample of samples) { | ||
| this.#stats.update((stats) => ({ | ||
| bytesReceived: (stats?.bytesReceived ?? 0) + sample.data.byteLength, | ||
| })); | ||
|
|
||
| const chunk = new EncodedAudioChunk({ | ||
| type: sample.keyframe ? "key" : "delta", | ||
| data: sample.data, | ||
| timestamp: sample.timestamp, | ||
| }); | ||
|
|
||
| decoder.decode(chunk); | ||
| } | ||
| } | ||
| } finally { | ||
| group.close(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid unbounded concurrent group processing in CMAF path.
Spawning a task per group without awaiting can lead to unbounded concurrency and out‑of‑order sample delivery. Process groups sequentially (or add bounded queueing/backpressure).
✅ Proposed fix (sequential group processing)
- for (;;) {
- const group = await sub.nextGroup();
- if (!group) break;
-
- effect.spawn(async () => {
- try {
- for (;;) {
- const segment = await group.readFrame();
- if (!segment) break;
-
- const samples = Container.Cmaf.decodeDataSegment(segment, timescale);
-
- for (const sample of samples) {
- this.#stats.update((stats) => ({
- bytesReceived: (stats?.bytesReceived ?? 0) + sample.data.byteLength,
- }));
-
- const chunk = new EncodedAudioChunk({
- type: sample.keyframe ? "key" : "delta",
- data: sample.data,
- timestamp: sample.timestamp,
- });
-
- decoder.decode(chunk);
- }
- }
- } finally {
- group.close();
- }
- });
- }
+ for (;;) {
+ const group = await sub.nextGroup();
+ if (!group) break;
+
+ try {
+ for (;;) {
+ const segment = await group.readFrame();
+ if (!segment) break;
+
+ const samples = Container.Cmaf.decodeDataSegment(segment, timescale);
+
+ for (const sample of samples) {
+ this.#stats.update((stats) => ({
+ bytesReceived: (stats?.bytesReceived ?? 0) + sample.data.byteLength,
+ }));
+
+ const chunk = new EncodedAudioChunk({
+ type: sample.keyframe ? "key" : "delta",
+ data: sample.data,
+ timestamp: sample.timestamp,
+ });
+
+ decoder.decode(chunk);
+ }
+ }
+ } finally {
+ group.close();
+ }
+ }🤖 Prompt for AI Agents
In `@js/hang/src/watch/audio/source.ts` around lines 238 - 287, The current code
spawns a new task for every group (effect.spawn inside the for(;;) loop) which
allows unbounded concurrency and can reorder samples; instead process groups
sequentially (or implement bounded backpressure) by removing the inner
effect.spawn and awaiting each group's processing inline: read frames from
group.readFrame(), decode with Container.Cmaf.decodeDataSegment and feed
EncodedAudioChunk to decoder.decode within the same loop, ensure group.close()
runs in finally, and keep the existing decoder lifecycle (decoder.configure,
effect.cleanup) intact so groups are handled one-by-one and
ordering/backpressure is preserved.
| effect.spawn(async () => { | ||
| // Process data segments | ||
| // TODO: Use a consumer wrapper for CMAF to support latency control | ||
| for (;;) { | ||
| const group = await sub.nextGroup(); | ||
| if (!group) break; | ||
|
|
||
| effect.spawn(async () => { | ||
| try { | ||
| for (;;) { | ||
| const segment = await group.readFrame(); | ||
| if (!segment) break; | ||
|
|
||
| const samples = Container.Cmaf.decodeDataSegment(segment, timescale); | ||
|
|
||
| for (const sample of samples) { | ||
| const chunk = new EncodedVideoChunk({ | ||
| type: sample.keyframe ? "key" : "delta", | ||
| data: sample.data, | ||
| timestamp: sample.timestamp, | ||
| }); | ||
|
|
||
| // Track stats | ||
| this.#stats.update((current) => ({ | ||
| frameCount: (current?.frameCount ?? 0) + 1, | ||
| timestamp: sample.timestamp, | ||
| bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength, | ||
| })); | ||
|
|
||
| decoder.decode(chunk); | ||
| } | ||
| } | ||
| } finally { | ||
| group.close(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid unbounded concurrent group processing in CMAF path.
Spawning a task per group without awaiting can grow concurrency and interleave samples. Prefer sequential group handling (or a bounded queue) to maintain ordering and limit memory pressure.
✅ Proposed fix (sequential group processing)
- for (;;) {
- const group = await sub.nextGroup();
- if (!group) break;
-
- effect.spawn(async () => {
- try {
- for (;;) {
- const segment = await group.readFrame();
- if (!segment) break;
-
- const samples = Container.Cmaf.decodeDataSegment(segment, timescale);
-
- for (const sample of samples) {
- const chunk = new EncodedVideoChunk({
- type: sample.keyframe ? "key" : "delta",
- data: sample.data,
- timestamp: sample.timestamp,
- });
-
- this.#stats.update((current) => ({
- frameCount: (current?.frameCount ?? 0) + 1,
- timestamp: sample.timestamp,
- bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength,
- }));
-
- decoder.decode(chunk);
- }
- }
- } finally {
- group.close();
- }
- });
- }
+ for (;;) {
+ const group = await sub.nextGroup();
+ if (!group) break;
+
+ try {
+ for (;;) {
+ const segment = await group.readFrame();
+ if (!segment) break;
+
+ const samples = Container.Cmaf.decodeDataSegment(segment, timescale);
+
+ for (const sample of samples) {
+ const chunk = new EncodedVideoChunk({
+ type: sample.keyframe ? "key" : "delta",
+ data: sample.data,
+ timestamp: sample.timestamp,
+ });
+
+ this.#stats.update((current) => ({
+ frameCount: (current?.frameCount ?? 0) + 1,
+ timestamp: sample.timestamp,
+ bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength,
+ }));
+
+ decoder.decode(chunk);
+ }
+ }
+ } finally {
+ group.close();
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| effect.spawn(async () => { | |
| // Process data segments | |
| // TODO: Use a consumer wrapper for CMAF to support latency control | |
| for (;;) { | |
| const group = await sub.nextGroup(); | |
| if (!group) break; | |
| effect.spawn(async () => { | |
| try { | |
| for (;;) { | |
| const segment = await group.readFrame(); | |
| if (!segment) break; | |
| const samples = Container.Cmaf.decodeDataSegment(segment, timescale); | |
| for (const sample of samples) { | |
| const chunk = new EncodedVideoChunk({ | |
| type: sample.keyframe ? "key" : "delta", | |
| data: sample.data, | |
| timestamp: sample.timestamp, | |
| }); | |
| // Track stats | |
| this.#stats.update((current) => ({ | |
| frameCount: (current?.frameCount ?? 0) + 1, | |
| timestamp: sample.timestamp, | |
| bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength, | |
| })); | |
| decoder.decode(chunk); | |
| } | |
| } | |
| } finally { | |
| group.close(); | |
| } | |
| }); | |
| effect.spawn(async () => { | |
| // Process data segments | |
| // TODO: Use a consumer wrapper for CMAF to support latency control | |
| for (;;) { | |
| const group = await sub.nextGroup(); | |
| if (!group) break; | |
| try { | |
| for (;;) { | |
| const segment = await group.readFrame(); | |
| if (!segment) break; | |
| const samples = Container.Cmaf.decodeDataSegment(segment, timescale); | |
| for (const sample of samples) { | |
| const chunk = new EncodedVideoChunk({ | |
| type: sample.keyframe ? "key" : "delta", | |
| data: sample.data, | |
| timestamp: sample.timestamp, | |
| }); | |
| // Track stats | |
| this.#stats.update((current) => ({ | |
| frameCount: (current?.frameCount ?? 0) + 1, | |
| timestamp: sample.timestamp, | |
| bytesReceived: (current?.bytesReceived ?? 0) + sample.data.byteLength, | |
| })); | |
| decoder.decode(chunk); | |
| } | |
| } | |
| } finally { | |
| group.close(); | |
| } | |
| } | |
| }); |
🤖 Prompt for AI Agents
In `@js/hang/src/watch/video/source.ts` around lines 385 - 420, The current loop
spawns an unbounded effect.spawn per group (using effect.spawn(...) around the
inner group loop), which can grow concurrency and interleave samples; change to
process each group sequentially by removing the per-group effect.spawn and
awaiting/handling the group's readFrame loop inline after const group = await
sub.nextGroup(), keeping the existing try/finally with group.close(), so
Container.Cmaf.decodeDataSegment, decoder.decode, and this.#stats.update run
in-order per group and do not create unbounded concurrent tasks.
Differences this time:
min_bufferto the catalog to automatically increase latency.