From 82ce1025430edb9d17fed00c219957bca9169203 Mon Sep 17 00:00:00 2001 From: bencmbrook Date: Sun, 29 Oct 2023 01:45:05 -0700 Subject: [PATCH] Add support for Abort Signal handling in demuxer --- src/demuxer.ts | 16 ++++++++--- test/mux-web-streams.test.ts | 52 +++++++++++++++++++++++++++++++++--- 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/demuxer.ts b/src/demuxer.ts index ec1b294..7e2f4bc 100644 --- a/src/demuxer.ts +++ b/src/demuxer.ts @@ -58,6 +58,7 @@ export const demuxer = < >( stream: ReadableStream, numberOfStreams: number, + options?: { signal?: AbortSignal }, ): DemuxedReadableStreams => { // Validation if (!(stream instanceof ReadableStream)) { @@ -98,12 +99,21 @@ export const demuxer = < // Pipe this input stream into a WritableStream which recreates the original streams and emits them as events when they start writing stream.pipeTo( new WritableStream({ - async write(chunk) { + write(chunk, controller) { + if (options?.signal?.aborted) { + Object.values(demuxedStreamControllerById).forEach( + (demuxedStreamController) => { + demuxedStreamController.error('The demuxer was aborted.'); + }, + ); + return controller.error('The demuxer was aborted.'); + } + // The chunk received here may be a concatenation of multiple chunks from `muxer` (network pipes may buffer them together). // Split up the chunks so they match the original chunks we enqueued in `muxer`. const muxedChunks = getMuxedChunks(chunk); - muxedChunks.forEach((muxedChunk: Uint8Array) => { + for (const muxedChunk of muxedChunks) { // Read the header, which is a byte array of metadata prepended to the chunk const header = arrayToHeader(muxedChunk); @@ -127,7 +137,7 @@ export const demuxer = < // Otherwise, enqueue the muxedChunk to the appropriate stream. demuxedStreamController.enqueue(value); } - }); + } }, close() {}, abort(error: string) { diff --git a/test/mux-web-streams.test.ts b/test/mux-web-streams.test.ts index 54a689a..d044fd6 100644 --- a/test/mux-web-streams.test.ts +++ b/test/mux-web-streams.test.ts @@ -143,10 +143,10 @@ test('`muxer` throwing can be handled by a client', async () => { .catch(reject); }); - await assert.rejects( - promise, - "Error: Muxer cannot have more than 250 input streams. Stream 'id' must be a number between 0 and 250", - ); + await assert.rejects(promise, { + message: + "Muxer cannot have more than 250 input streams. Stream 'id' must be a number between 0 and 250", + }); }); // Test async is not blocking @@ -237,3 +237,47 @@ test('`muxer` gracefully handles cancels from reader', async () => { 'The muxer stream was canceled: I am no longer interested in this stream.', ); }); + +test('`demuxer` gracefully handles abort signal', async () => { + // Create an abort controller + const abortController = new AbortController(); + const signal = abortController.signal; + + // Create ReadableStreams from the test data + const originalStreams = Object.values(inputData).map((v) => + createStreamFromArray(v), + ); + + // Mux the streams together + const muxedStream = muxer(originalStreams); + + // Demux the stream + const demuxedStreams = demuxer(muxedStream, originalStreams.length, { + signal, + }); + + let i = 0; + const promise = new Promise((resolve, reject) => { + demuxedStreams[1]!.pipeTo( + new WritableStream({ + write() { + if (i === 1) { + return abortController.abort(); + } + i++; + }, + close() { + resolve(); + }, + abort(reason) { + reject(`Aborted at ${i}. ${reason}`); + }, + }), + ); + }); + + await assert.rejects(promise, (err) => { + assert.strictEqual(err as any, 'Aborted at 1. The demuxer was aborted.'); + return true; + }); +});