From dafa49884686f73f31d1e4478473ca2b9cdabd5d Mon Sep 17 00:00:00 2001 From: Bri <34875062+Monkatraz@users.noreply.github.com> Date: Mon, 28 Jul 2025 13:08:25 -0700 Subject: [PATCH] non-strict parsing --- PROTOCOL.md | 36 +++++++++ router/parsing.ts | 86 ++++++++++++++++++++ router/server.ts | 181 ++++++++++++++++++++++++++++++------------- transport/options.ts | 10 ++- transport/server.ts | 20 ++--- 5 files changed, 268 insertions(+), 65 deletions(-) create mode 100644 router/parsing.ts diff --git a/PROTOCOL.md b/PROTOCOL.md index e35a4cdd..58d2e1fc 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -250,6 +250,42 @@ type Control = `Control` is a payload that is wrapped with `TransportMessage`. +### Non-strict input parsing + +By default, River validates the input to procedure calls (`requestInit`, `requestData`, and so on) using a mundane type check via TypeBox. If we had a schema like so: + +```ts +const schema = Type.Object({ + a: Type.String(), + b: Type.Number(), +}); +``` + +Then the following input would fail to validate: + +```json +{ "a": "foo" } +``` + +as the `b` property is not present. + +Optionally, separately on the server and transport you can set `strictInputParsing` to `false` which will make River be more liberal on what it considers valid - even up to defaulting values if this would make the input pass validation. The previous example input that failed would actually pass in non-strict mode, and it would be parsed and yielded to procedure handlers as: + +```json +{ "a": "foo", "b": 0 } +``` + +Conceptually this is intended to be closest to [what proto3 has to do to enable wire-safe changes to schemas](https://protobuf.dev/programming-guides/proto3/#wire-safe-changes). It requires that procedure handlers do additional work in validating their input - but it makes it much safer to host River servers that are deployed separately from their consumers. Care must be taken still, as TypeBox is much more expressive and it's much harder to make the same guarantees as proto3 does. + +Non-strict parsing _only applies to user-provided values_. Everything else, typically control messages, are parsed strictly and must pass an immediate validation check. For servers, this means that `requestInit`, `requestData`, and any other procedure input objects are parsed non-strictly, and for transports the extended handshake input metadata is parsed non-strictly. + +Non-strict parsing should allow the following cases: + +- Adding an additional, non-optional field to a schema. +- Extending an enum's value (the server should default to the first enum member for unknown cases) + +TODO + ## Streams Streams tie together a series of messages into a single logical 'stream' of communication associated with a single remote procedure invocation. diff --git a/router/parsing.ts b/router/parsing.ts new file mode 100644 index 00000000..76cd5126 --- /dev/null +++ b/router/parsing.ts @@ -0,0 +1,86 @@ +import { Static, TSchema } from '@sinclair/typebox'; +import { Value } from '@sinclair/typebox/value'; +import { castTypeboxValueErrors, ValidationErrors } from './errors'; + +/** + * Result of parsing an input. This will either hold the result of parsing, + * which in non-strict may involve modifications on the input, or the errors + * if the parsing failed. + */ +export type ParseInputResult = + | { ok: true; value: T } + | { ok: false; errors: Static }; + +/** + * Options for {@link parseInput}. + */ +export interface ParseInputOptions { + /** + * If the schema should be parsed strictly, meaning that the input value + * is expected to match the schema exactly without any processing. This + * is the default behavior (`true`). + * + * In non-strict mode, the behavior is much closer to protobuf semantics - + * e.g. new fields are defaulted, unknown enum values are set to their first + * member, and so on. Input's may still fail to parse, e.g. if they set a + * known field to the wrong type. + */ + strict?: boolean; + + /** + * The schema to parse the input against. + */ + schema: T; + + /** + * The input value to parse. + */ + input: unknown; +} + +/** + * Parse an input against a schema. This is intended for non-control, client + * provided input. This yields a {@link ParseInputResult} which will either + * hold the result of parsing, which in non-strict may involve modifications on + * the input, or the errors if the parsing failed. + * + * @see {@link ParseInputOptions} + */ +export function parseInput({ + strict, + schema, + input, +}: ParseInputOptions): ParseInputResult> { + // default path, we just check the value against the schema + if (strict) { + return Value.Check(schema, input) + ? { ok: true, value: input } + : { + ok: false, + errors: castTypeboxValueErrors(Value.Errors(schema, input)), + }; + } + + let parsed = input; + + try { + // TODO: switch to Value.Parse when we have it + // parsed = Value.Parse(['Clone', 'Clean', 'Default', 'Decode'], value); + parsed = Value.Clone(parsed); + parsed = Value.Clean(schema, parsed); + parsed = Value.Default(schema, parsed); + // skipped: Value.Convert(schema, parsed); + // unavailable: Value.Assert(schema, parsed); + parsed = Value.Decode(schema, parsed); + } catch { + return { + ok: false, + // we intentionally get the errors for the parsed value we currently have, + // as that signifies the point in parsing in which we failed to continue + // cleaning up the input. + errors: castTypeboxValueErrors(Value.Errors(schema, parsed)), + }; + } + + return { ok: true, value: parsed }; +} diff --git a/router/server.ts b/router/server.ts index 87341e44..d252ab79 100644 --- a/router/server.ts +++ b/router/server.ts @@ -46,6 +46,7 @@ import { ServerTransport } from '../transport/server'; import { ReadableImpl, WritableImpl } from './streams'; import { IdentifiedSession } from '../transport/sessionStateMachine/common'; import { SessionBoundSendFn } from '../transport/transport'; +import { parseInput } from './parsing'; type StreamId = string; @@ -121,6 +122,12 @@ class RiverServer< private log?: Logger; private middlewares: Array; + /** + * If the server is configured to parse input strictly, or try to use + * more protobuf-like semantics (if `false`). + */ + private readonly strictInputParsing: boolean; + /** * We create a tombstones for streams cancelled by the server * so that we don't hit errors when the client has inflight @@ -140,14 +147,23 @@ class RiverServer< private unregisterTransportListeners: () => void; - constructor( - transport: ServerTransport, - services: Services, - handshakeOptions?: ServerHandshakeOptions, - extendedContext?: Context, + constructor({ + transport, + services, + handshakeOptions, + extendedContext, + strictInputParsing = true, maxCancelledStreamTombstonesPerSession = 200, - middlewares: Array = [], - ) { + middlewares = [], + }: { + transport: ServerTransport; + services: Services; + handshakeOptions?: ServerHandshakeOptions; + extendedContext?: Context; + strictInputParsing?: boolean; + maxCancelledStreamTombstonesPerSession?: number; + middlewares?: Array; + }) { const instances: Record = {}; this.middlewares = middlewares; @@ -180,6 +196,7 @@ class RiverServer< this.maxCancelledStreamTombstonesPerSession = maxCancelledStreamTombstonesPerSession; this.log = transport.log; + this.strictInputParsing = strictInputParsing; const handleCreatingNewStreams = (message: EventMap['message']) => { if (message.to !== this.transport.clientId) { @@ -352,28 +369,37 @@ class RiverServer< return; } - // normal request data for upload or stream - if ( - 'requestData' in procedure && - Value.Check(procedure.requestData, msg.payload) - ) { - reqReadable._pushValue(Ok(msg.payload)); - + // we may either be parsing some request data input, or this is a control + // message - we'll handle both cases at once. + const requestDataResult = + 'requestData' in procedure + ? parseInput({ + strict: this.strictInputParsing, + schema: procedure.requestData, + input: msg.payload, + }) + : null; + + // most expected case: requestData present, parse ok + // as an optimization, we check this case first + if (requestDataResult?.ok) { + reqReadable._pushValue(Ok(requestDataResult.value)); + + // It's atypical for any of our post-v1 clients to send a close with a + // request payload, but it's technically legal, so we'll handle it. if (isStreamCloseBackwardsCompat(msg.controlFlags, protocolVersion)) { - // It's atypical for any of our post-v1 clients to send a close with a - // request payload, but it's technically legal, so we'll handle it. closeReadable(); } return; } + // less common expected case: the client is closing a stream or upload + // procedure after they're done (control message) if ( Value.Check(ControlMessagePayloadSchema, msg.payload) && isStreamCloseBackwardsCompat(msg.controlFlags, protocolVersion) ) { - // Clients typically send this shape of close for stream and upload - // after they're done. closeReadable(); return; @@ -382,16 +408,15 @@ class RiverServer< // We couldn't make sense of the message, it's probably a bad request let validationErrors: Static; let errMessage: string; - if ('requestData' in procedure) { + + if (requestDataResult) { errMessage = 'message in requestData position did not match schema'; - validationErrors = castTypeboxValueErrors( - Value.Errors(procedure.requestData, msg.payload), - ); + validationErrors = requestDataResult.errors; } else { + errMessage = 'message in control payload position did not match schema'; validationErrors = castTypeboxValueErrors( Value.Errors(ControlMessagePayloadSchema, msg.payload), ); - errMessage = 'message in control payload position did not match schema'; } this.log?.warn(errMessage, { @@ -892,39 +917,62 @@ class RiverServer< return null; } + let initPayload: unknown; let passInitAsDataForBackwardsCompat = false; + + // TODO remove once clients migrate to v2 + // In v1.1 sometimes the first message is not `init`, but instead it's the `input` + // this backwards compatibility path requires procedures to define their `init` as + // an empty-object-compatible-schema (i.e. either actually empty or optional values) + // The reason we don't check if `init` is satisified here is because false positives + // are easy to hit, we'll err on the side of caution and treat it as a request, servers + // that expect v1.1 clients should handle this case themselves. if ( session.protocolVersion === 'v1.1' && (procedure.type === 'upload' || procedure.type === 'stream') && - Value.Check(procedure.requestData, initMessage.payload) && Value.Check(procedure.requestInit, {}) ) { - // TODO remove once clients migrate to v2 - // In v1.1 sometimes the first message is not `init`, but instead it's the `input` - // this backwards compatibility path requires procedures to define their `init` as - // an empty-object-compatible-schema (i.e. either actually empty or optional values) - // The reason we don't check if `init` is satisified here is because false positives - // are easy to hit, we'll err on the side of caution and treat it as a request, servers - // that expect v1.1 clients should handle this case themselves. - passInitAsDataForBackwardsCompat = true; - } else if (!Value.Check(procedure.requestInit, initMessage.payload)) { - const errMessage = `procedure init failed validation`; - this.log?.warn(errMessage, { - ...session.loggingMetadata, - clientId: this.transport.clientId, - transportMessage: initMessage, - tags: ['invalid-request'], + const requestDataResult = parseInput({ + strict: this.strictInputParsing, + schema: procedure.requestData, + input: initMessage.payload, }); - cancelStream( - initMessage.streamId, - Err({ - code: INVALID_REQUEST_CODE, - message: errMessage, - }), - ); + // we fallback to our usual logic if this fails + if (requestDataResult?.ok) { + initPayload = requestDataResult.value; + passInitAsDataForBackwardsCompat = true; + } + } - return null; + if (!passInitAsDataForBackwardsCompat) { + const requestInitResult = parseInput({ + strict: this.strictInputParsing, + schema: procedure.requestInit, + input: initMessage.payload, + }); + + if (!requestInitResult?.ok) { + const errMessage = `procedure init failed validation`; + this.log?.warn(errMessage, { + ...session.loggingMetadata, + clientId: this.transport.clientId, + transportMessage: initMessage, + tags: ['invalid-request'], + }); + + cancelStream( + initMessage.streamId, + Err({ + code: INVALID_REQUEST_CODE, + message: errMessage, + }), + ); + + return null; + } + + initPayload = requestInitResult.value; } return { @@ -933,7 +981,7 @@ class RiverServer< procedureName: initMessage.procedureName, serviceName: initMessage.serviceName, tracingCtx: initMessage.tracing, - initPayload: initMessage.payload, + initPayload, sessionMetadata: sessionMetadata, procedure, serviceContext, @@ -1076,25 +1124,48 @@ export function createServer< transport: ServerTransport, services: Services, providedServerOptions?: Partial<{ + /** + * An optional object containing additional handshake options to be + * passed to the transport. + */ handshakeOptions?: ServerHandshakeOptions; + + /** + * An optional object containing additional context to be passed to all services. + */ extendedContext?: Context; + + /** + * If `true` (the default), then the server will parse input (non-control, + * user-provided) values strictly, meaning that the input value is expected + * to match the schema exactly without any processing. + * + * You should consider making sure your transport has this set as well if you + * change this value from its default. For transports, this mainly handles + * the handshake metadata parsing - while this is for procedures. + */ + strictInputParsing?: boolean; + /** * Maximum number of cancelled streams to keep track of to avoid * cascading stream errors. */ maxCancelledStreamTombstonesPerSession?: number; + /** - * Middlewares run before procedure handlers allowing you to inspect requests and responses.. + * Middlewares run before procedure handlers allowing you to inspect requests and responses. */ middlewares?: Array; }>, ): Server { - return new RiverServer( + return new RiverServer({ transport, services, - providedServerOptions?.handshakeOptions, - providedServerOptions?.extendedContext, - providedServerOptions?.maxCancelledStreamTombstonesPerSession, - providedServerOptions?.middlewares, - ); + handshakeOptions: providedServerOptions?.handshakeOptions, + extendedContext: providedServerOptions?.extendedContext, + strictInputParsing: providedServerOptions?.strictInputParsing, + maxCancelledStreamTombstonesPerSession: + providedServerOptions?.maxCancelledStreamTombstonesPerSession, + middlewares: providedServerOptions?.middlewares, + }); } diff --git a/transport/options.ts b/transport/options.ts index 995ebbb1..fd99fd15 100644 --- a/transport/options.ts +++ b/transport/options.ts @@ -33,10 +33,18 @@ export const defaultClientTransportOptions: ClientTransportOptions = { ...defaultConnectionRetryOptions, }; -export type ServerTransportOptions = TransportOptions; +export type ServerTransportOptions = TransportOptions & { + /** + * If `true` (the default), then the transport/server will parse input + * (non-control, user-provided) values strictly, meaning that the input value + * is expected to match the schema exactly without any processing. + */ + strictInputParsing: boolean; +}; export type ProvidedServerTransportOptions = Partial; export const defaultServerTransportOptions: ServerTransportOptions = { ...defaultTransportOptions, + strictInputParsing: true, }; diff --git a/transport/server.ts b/transport/server.ts index 8a7f8554..9e5ff105 100644 --- a/transport/server.ts +++ b/transport/server.ts @@ -29,6 +29,7 @@ import { ServerSession, ServerSessionStateGraph, } from './sessionStateMachine/transitions'; +import { parseInput } from '../router/parsing'; export abstract class ServerTransport< ConnType extends Connection, @@ -267,7 +268,13 @@ export abstract class ServerTransport< // invariant: must pass custom validation if defined let parsedMetadata: ParsedMetadata = {} as ParsedMetadata; if (this.handshakeExtensions) { - if (!Value.Check(this.handshakeExtensions.schema, msg.payload.metadata)) { + const parseMetadataResult = parseInput({ + strict: this.options.strictInputParsing, + schema: this.handshakeExtensions.schema, + input: msg.payload.metadata, + }); + + if (!parseMetadataResult?.ok) { this.rejectHandshakeRequest( session, msg.from, @@ -276,12 +283,7 @@ export abstract class ServerTransport< { ...session.loggingMetadata, connectedTo: msg.from, - validationErrors: [ - ...Value.Errors( - this.handshakeExtensions.schema, - msg.payload.metadata, - ), - ], + validationErrors: parseMetadataResult.errors, }, ); @@ -294,7 +296,7 @@ export abstract class ServerTransport< const parsedMetadataOrFailureCode = await this.handshakeExtensions.validate( - msg.payload.metadata, + parseMetadataResult.value, previousParsedMetadata, ); @@ -304,7 +306,7 @@ export abstract class ServerTransport< return; } - // handler rejected the connection + // handler rejected the connection (the value is a special error string) if ( Value.Check( HandshakeErrorCustomHandlerFatalResponseCodes,