Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const {
SafePromiseAll,
SafePromisePrototypeFinally,
SafeSet,
StringPrototypeStartsWith,
Symbol,
TypeError,
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetByteLength,
Expand Down Expand Up @@ -94,6 +96,8 @@ const { UV_EOF } = internalBinding('uv');

const encoder = new TextEncoder();

const kValidateChunk = Symbol('kValidateChunk');

// Collect all negative (error) ZLIB codes and Z_NEED_DICT
const ZLIB_FAILURES = new SafeSet([
...ArrayPrototypeFilter(
Expand All @@ -115,7 +119,14 @@ function handleKnownInternalErrors(cause) {
case cause?.code === 'ERR_STREAM_PREMATURE_CLOSE': {
return new AbortError(undefined, { cause });
}
case ZLIB_FAILURES.has(cause?.code): {
case ZLIB_FAILURES.has(cause?.code):
// Brotli decoder error codes are formatted as 'ERR_' +
// BrotliDecoderErrorString(), where the latter returns strings like
// '_ERROR_FORMAT_...', '_ERROR_ALLOC_...', '_ERROR_UNREACHABLE', etc.
// The resulting JS error codes all start with 'ERR__ERROR_'.
// Falls through
case cause?.code != null &&
StringPrototypeStartsWith(cause.code, 'ERR__ERROR_'): {
// eslint-disable-next-line no-restricted-syntax
const error = new TypeError(undefined, { cause });
error.code = cause.code;
Expand All @@ -139,9 +150,10 @@ function handleKnownInternalErrors(cause) {

/**
* @param {Writable} streamWritable
* @param {object} [options]
* @returns {WritableStream}
*/
function newWritableStreamFromStreamWritable(streamWritable) {
function newWritableStreamFromStreamWritable(streamWritable, options = kEmptyObject) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
Expand Down Expand Up @@ -220,12 +232,25 @@ function newWritableStreamFromStreamWritable(streamWritable) {
if (!streamWritable.writableObjectMode && isArrayBuffer(chunk)) {
chunk = new Uint8Array(chunk);
}
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = PromiseWithResolvers();
return SafePromisePrototypeFinally(
backpressurePromise.promise, () => {
backpressurePromise = undefined;
});
// If the underlying Node.js stream throws synchronously from
// write() (e.g. zlib rejects an invalid chunk type), we must
// destroy the stream so that the readable side is also errored.
// Without this, the readable side would hang forever waiting
// for data that will never arrive. The same applies to any
// caller-provided chunk validation (e.g. web compression
// SharedArrayBuffer rejection).
try {
options[kValidateChunk]?.(chunk);
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = PromiseWithResolvers();
return SafePromisePrototypeFinally(
backpressurePromise.promise, () => {
backpressurePromise = undefined;
});
}
} catch (error) {
destroy(streamWritable, error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Destroying any adapted stream when .write() throws is problematic, as something as trivial as writing null will now bring down the entire stream, as opposed to just being rejected with a validation error.

Copy link
Member Author

@panva panva Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me look into it, it would help if can you make a particular suggestion?

throw error;
}
},

Expand Down Expand Up @@ -662,9 +687,14 @@ function newReadableWritablePairFromDuplex(duplex, options = kEmptyObject) {
return { readable, writable };
}

const writableOptions = {
__proto__: null,
[kValidateChunk]: options[kValidateChunk],
};

const writable =
isWritable(duplex) ?
newWritableStreamFromStreamWritable(duplex) :
newWritableStreamFromStreamWritable(duplex, writableOptions) :
new WritableStream();

if (!isWritable(duplex))
Expand Down Expand Up @@ -1064,4 +1094,5 @@ module.exports = {
newStreamDuplexFromReadableWritablePair,
newWritableStreamFromStreamBase,
newReadableStreamFromStreamBase,
kValidateChunk,
};
46 changes: 35 additions & 11 deletions lib/internal/webstreams/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,27 @@ const {

const {
newReadableWritablePairFromDuplex,
kValidateChunk,
} = require('internal/webstreams/adapters');

const { customInspect } = require('internal/webstreams/util');

const {
isArrayBufferView,
isSharedArrayBuffer,
} = require('internal/util/types');

const {
customInspectSymbol: kInspect,
kEnumerableProperty,
} = require('internal/util');

const {
codes: {
ERR_INVALID_ARG_TYPE,
},
} = require('internal/errors');

const { createEnumConverter } = require('internal/webidl');

let zlib;
Expand All @@ -24,6 +36,18 @@ function lazyZlib() {
return zlib;
}

// Per the Compression Streams spec, chunks must be BufferSource
// (ArrayBuffer or ArrayBufferView not backed by SharedArrayBuffer).
function validateBufferSourceChunk(chunk) {
if (isArrayBufferView(chunk) && isSharedArrayBuffer(chunk.buffer)) {
throw new ERR_INVALID_ARG_TYPE(
'chunk',
['Buffer', 'TypedArray', 'DataView'],
chunk,
);
}
}

const formatConverter = createEnumConverter('CompressionFormat', [
'deflate',
'deflate-raw',
Expand Down Expand Up @@ -62,7 +86,9 @@ class CompressionStream {
this.#handle = lazyZlib().createBrotliCompress();
break;
}
this.#transform = newReadableWritablePairFromDuplex(this.#handle);
this.#transform = newReadableWritablePairFromDuplex(this.#handle, {
[kValidateChunk]: validateBufferSourceChunk,
});
}

/**
Expand Down Expand Up @@ -108,25 +134,23 @@ class DecompressionStream {
});
break;
case 'deflate-raw':
this.#handle = lazyZlib().createInflateRaw();
this.#handle = lazyZlib().createInflateRaw({
rejectGarbageAfterEnd: true,
});
break;
case 'gzip':
this.#handle = lazyZlib().createGunzip({
rejectGarbageAfterEnd: true,
});
break;
case 'brotli':
this.#handle = lazyZlib().createBrotliDecompress();
this.#handle = lazyZlib().createBrotliDecompress({
rejectGarbageAfterEnd: true,
});
break;
}
this.#transform = newReadableWritablePairFromDuplex(this.#handle);

this.#handle.on('error', (err) => {
if (this.#transform?.writable &&
!this.#transform.writable.locked &&
typeof this.#transform.writable.abort === 'function') {
this.#transform.writable.abort(err);
}
this.#transform = newReadableWritablePairFromDuplex(this.#handle, {
[kValidateChunk]: validateBufferSourceChunk,
});
}

Expand Down
8 changes: 8 additions & 0 deletions test/common/wpt.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class ResourceLoader {
return {
ok: true,
arrayBuffer() { return data.buffer; },
bytes() { return new Uint8Array(data); },
json() { return JSON.parse(data.toString()); },
text() { return data.toString(); },
};
Expand Down Expand Up @@ -721,6 +722,7 @@ class WPTRunner {
// Mark the whole test as failed in wpt.fyi report.
reportResult?.finish('ERROR');
this.inProgress.delete(spec);
this.report?.write();
});

await events.once(worker, 'exit').catch(() => {});
Expand Down Expand Up @@ -787,6 +789,9 @@ class WPTRunner {
}
}

// Write the report on clean exit. The report is also written
// incrementally after each spec completes (see completionCallback)
// so that results survive if the process is killed.
this.report?.write();

const ran = queue.length;
Expand Down Expand Up @@ -873,6 +878,9 @@ class WPTRunner {
reportResult?.finish();
}
this.inProgress.delete(spec);
// Write report incrementally so results survive even if the process
// is killed before the exit handler runs.
this.report?.write();
// Always force termination of the worker. Some tests allocate resources
// that would otherwise keep it alive.
this.workers.get(spec).terminate();
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ See [test/wpt](../../wpt/README.md) for information on how these tests are run.
Last update:

- common: https://github.com/web-platform-tests/wpt/tree/dbd648158d/common
- compression: https://github.com/web-platform-tests/wpt/tree/67880a4eb8/compression
- compression: https://github.com/web-platform-tests/wpt/tree/ae05f5cb53/compression
- console: https://github.com/web-platform-tests/wpt/tree/e48251b778/console
- dom/abort: https://github.com/web-platform-tests/wpt/tree/dc928169ee/dom/abort
- dom/events: https://github.com/web-platform-tests/wpt/tree/0a811c5161/dom/events
Expand Down
57 changes: 57 additions & 0 deletions test/fixtures/wpt/compression/compression-bad-chunks.any.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// META: global=window,worker,shadowrealm
// META: script=resources/formats.js

'use strict';

const badChunks = [
{
name: 'undefined',
value: undefined
},
{
name: 'null',
value: null
},
{
name: 'numeric',
value: 3.14
},
{
name: 'object, not BufferSource',
value: {}
},
{
name: 'array',
value: [65]
},
{
name: 'SharedArrayBuffer',
// Use a getter to postpone construction so that all tests don't fail where
// SharedArrayBuffer is not yet implemented.
get value() {
// See https://github.com/whatwg/html/issues/5380 for why not `new SharedArrayBuffer()`
return new WebAssembly.Memory({ shared:true, initial:1, maximum:1 }).buffer;
}
},
{
name: 'shared Uint8Array',
get value() {
// See https://github.com/whatwg/html/issues/5380 for why not `new SharedArrayBuffer()`
return new Uint8Array(new WebAssembly.Memory({ shared:true, initial:1, maximum:1 }).buffer)
}
},
];

for (const format of formats) {
for (const chunk of badChunks) {
promise_test(async t => {
const cs = new CompressionStream(format);
const reader = cs.readable.getReader();
const writer = cs.writable.getWriter();
const writePromise = writer.write(chunk.value);
const readPromise = reader.read();
await promise_rejects_js(t, TypeError, writePromise, 'write should reject');
await promise_rejects_js(t, TypeError, readPromise, 'read should reject');
}, `chunk of type ${chunk.name} should error the stream for ${format}`);
}
}

This file was deleted.

Loading