diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8ae3fff11abbf1..61d85169607e19 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1678,6 +1678,10 @@ class PipeToReadableStreamReadRequest { // synchronous write during enqueue(). See WHATWG Streams spec // "ReadableStreamPipeTo" step 15's "chunk steps". queueMicrotask(() => { + if (this.writer[kState].stream === undefined) { + this.promise.resolve(false); + return; + } this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk); markPromiseAsHandled(this.state.currentWrite); this.promise.resolve(false); diff --git a/test/parallel/test-webstreams-pipeto-writer-released-race.js b/test/parallel/test-webstreams-pipeto-writer-released-race.js new file mode 100644 index 00000000000000..64f84855a58867 --- /dev/null +++ b/test/parallel/test-webstreams-pipeto-writer-released-race.js @@ -0,0 +1,66 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { ReadableStream, WritableStream } = require('stream/web'); + +{ + let sourceController; + let destController; + + const source = new ReadableStream({ + start(controller) { + sourceController = controller; + }, + }); + + const dest = new WritableStream({ + start(controller) { + destController = controller; + }, + write() {}, + }); + + source.pipeTo(dest, { preventCancel: true }).then( + common.mustNotCall('pipeTo should not resolve'), + common.mustCall((err) => { + assert.strictEqual(err.message, 'destination errored'); + }) + ); + + setImmediate(common.mustCall(() => { + destController.error(new Error('destination errored')); + sourceController.enqueue('chunk'); + })); +} + +{ + const ac = new AbortController(); + let sourceController; + const chunks = []; + + const source = new ReadableStream({ + start(controller) { + sourceController = controller; + }, + }, { highWaterMark: 0 }); + + const dest = new WritableStream({ + write: common.mustCall((chunk) => { + chunks.push(chunk); + }), + }, { highWaterMark: 1 }); + + source.pipeTo(dest, { signal: ac.signal }).then( + common.mustNotCall('pipeTo should not resolve'), + common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.deepStrictEqual(chunks, ['chunk']); + }) + ); + + setImmediate(common.mustCall(() => { + sourceController.enqueue('chunk'); + ac.abort(); + })); +}