diff --git a/.github/workflows/ensure-labels.yml b/.github/workflows/ensure-labels.yml index 2a506924..81d2ed74 100644 --- a/.github/workflows/ensure-labels.yml +++ b/.github/workflows/ensure-labels.yml @@ -24,5 +24,5 @@ jobs: - name: Check one of required labels are set uses: docker://agilepathway/pull-request-label-checker:v1.6.65 with: - one_of: major,minor,patch,skip-release + one_of: major,minor,patch,skip-release,release-same-version repo_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4da12fc7..f1e5c9da 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -23,6 +23,7 @@ jobs: has_changes: ${{ steps.finalize.outputs.has_changes }} bump: ${{ steps.finalize.outputs.bump }} should_publish: ${{ steps.finalize.outputs.should_publish }} + same_version: ${{ steps.finalize.outputs.same_version }} steps: - name: Set default outputs id: defaults @@ -31,6 +32,7 @@ jobs: echo "has_changes=false" >> $GITHUB_OUTPUT echo "bump=patch" >> $GITHUB_OUTPUT echo "should_publish=false" >> $GITHUB_OUTPUT + echo "same_version=false" >> $GITHUB_OUTPUT - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 with: @@ -79,11 +81,11 @@ jobs: echo "PR labels: $LABELS" echo "labels=$LABELS" >> $GITHUB_OUTPUT - # Check if PR has version bump labels - if echo "$LABELS" | grep -qE '\b(patch|minor|major)\b'; then + # Check if PR has a release label (a version-bump label or release-same-version) + if echo "$LABELS" | grep -qE '\b(patch|minor|major|release-same-version)\b'; then echo "should_publish=true" >> $GITHUB_OUTPUT else - echo "No version bump label found (patch/minor/major)" + echo "No release label found (patch/minor/major/release-same-version)" echo "should_publish=false" >> $GITHUB_OUTPUT fi @@ -93,13 +95,23 @@ jobs: env: LABELS: ${{ steps.pr-info.outputs.labels }} run: | - if echo "$LABELS" | grep -qE '\bmajor\b'; then + # release-same-version takes precedence: publish the versions already in + # package.json as-is, with no bump. Use it when versions were set explicitly + # in the PR (e.g. a deliberate, hand-committed semver-major bump). + if echo "$LABELS" | grep -qE '\brelease-same-version\b'; then + echo "same_version=true" >> $GITHUB_OUTPUT + echo "bump=none" >> $GITHUB_OUTPUT + echo "Release mode: publish current package.json versions without bumping" + elif echo "$LABELS" | grep -qE '\bmajor\b'; then + echo "same_version=false" >> $GITHUB_OUTPUT echo "bump=major" >> $GITHUB_OUTPUT echo "Version bump: major" elif echo "$LABELS" | grep -qE '\bminor\b'; then + echo "same_version=false" >> $GITHUB_OUTPUT echo "bump=minor" >> $GITHUB_OUTPUT echo "Version bump: minor" else + echo "same_version=false" >> $GITHUB_OUTPUT echo "bump=patch" >> $GITHUB_OUTPUT echo "Version bump: patch" fi @@ -240,6 +252,8 @@ jobs: DEFAULT_BUMP: ${{ steps.defaults.outputs.bump }} PR_SHOULD_PUBLISH: ${{ steps.pr-info.outputs.should_publish }} DEFAULT_SHOULD_PUBLISH: ${{ steps.defaults.outputs.should_publish }} + VERSION_SAME: ${{ steps.version.outputs.same_version }} + DEFAULT_SAME_VERSION: ${{ steps.defaults.outputs.same_version }} run: | # Use build-matrix outputs if available, otherwise defaults if [ -n "$BUILD_MATRIX" ]; then @@ -266,6 +280,12 @@ jobs: echo "should_publish=$DEFAULT_SHOULD_PUBLISH" >> $GITHUB_OUTPUT fi + if [ -n "$VERSION_SAME" ]; then + echo "same_version=$VERSION_SAME" >> $GITHUB_OUTPUT + else + echo "same_version=$DEFAULT_SAME_VERSION" >> $GITHUB_OUTPUT + fi + # Single job that bumps, publishes, and pushes tags/commits at the end # This avoids the "checkout wrong commit" problem of multi-job workflows release: @@ -305,8 +325,10 @@ jobs: - name: Install dependencies run: pnpm install --frozen-lockfile --ignore-scripts + # Skipped for release-same-version: the versions in package.json are published as-is. - name: Bump versions for changed packages id: bump + if: needs.detect-changes.outputs.same_version != 'true' env: MATRIX: ${{ needs.detect-changes.outputs.matrix }} BUMP: ${{ needs.detect-changes.outputs.bump }} diff --git a/biome.json b/biome.json index e55fa69d..b0390747 100644 --- a/biome.json +++ b/biome.json @@ -15,5 +15,17 @@ "noUnusedPrivateClassMembers": "off" } } - } + }, + "overrides": [ + { + "includes": ["**/bench/**"], + "linter": { + "rules": { + "suspicious": { + "noConsole": "off" + } + } + } + } + ] } diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index e9b7aefd..2f1a478b 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -51,6 +51,12 @@ export abstract class AbstractAmqpPublisher< dependencies: AMQPDependencies, options: AMQPPublisherOptions, ) { + if (options.codec) { + throw new Error( + 'codec is not supported by AbstractAmqpPublisher. Remove the codec option or use an SQS/SNS publisher.', + ) + } + super(dependencies, options) this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options) diff --git a/packages/amqp/package.json b/packages/amqp/package.json index 9bd0d2d2..6030de1e 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/amqp", - "version": "24.0.0", + "version": "24.1.0", "engines": { "node": ">=18" }, @@ -43,7 +43,7 @@ "@biomejs/biome": "^2.3.8", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", - "@message-queue-toolkit/core": "*", + "@message-queue-toolkit/core": "workspace:*", "@types/amqplib": "0.10.8", "@types/node": "^25.5.0", "@vitest/coverage-v8": "^4.0.18", diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 3822aa82..7962d7e6 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -6,6 +6,7 @@ import { asClass, asFunction, Lifetime } from 'awilix' import { asMockFunction } from 'awilix-manager' import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' import { ZodError } from 'zod/v4' +import { AbstractAmqpQueuePublisher } from '../../lib/AbstractAmqpQueuePublisher.ts' import { deserializeAmqpMessage } from '../../lib/amqpMessageDeserializer.ts' import { AmqpPermissionConsumer } from '../consumers/AmqpPermissionConsumer.ts' import type { @@ -25,6 +26,17 @@ import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext.ts' import { AmqpPermissionPublisher } from './AmqpPermissionPublisher.ts' describe('PermissionPublisher', () => { + describe('constructor', () => { + it('throws when codec option is set (codec is not supported by AMQP publishers)', () => { + // AmqpPermissionPublisher strips unknown options before calling super(), so we test + // the guard via a minimal pass-through subclass that mirrors real user code. + class TestPublisher extends AbstractAmqpQueuePublisher<{ messageType: string }> {} + expect(() => new TestPublisher({} as any, { codec: 'zstd' } as any)).toThrow( + 'codec is not supported by AbstractAmqpPublisher', + ) + }) + }) + describe('logging', () => { let logger: FakeLogger let diContainer: AwilixContainer diff --git a/packages/core/README.md b/packages/core/README.md index 125a1285..b83760cd 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -641,6 +641,58 @@ class MyPayloadStore implements PayloadStore { } ``` +#### Message compression (codec) + +Publishers can compress outgoing messages by setting `codec` in their options. The codec implementation (zstd via the Node.js built-in `zlib` module) ships inside `@message-queue-toolkit/core` — there is no extra package to install. Requires **Node.js >=22.15.0**. Only the SQS and SNS adapters support compression. + +**Built-in zstd:** + +```typescript +import { MessageCodecEnum } from '@message-queue-toolkit/core' + +new MyPublisher(deps, { + codec: MessageCodecEnum.ZSTD, + // Optional: skip compression for messages below this byte threshold (default: 512). + // Small messages often expand when compressed; set to 0 to always compress. + skipCompressionBelow: 512, +}) +``` + +**Custom codec** (bring your own compression library): + +```typescript +import type { MessageCodecHandler } from '@message-queue-toolkit/core' + +class MyLz4Handler implements MessageCodecHandler { + async compress(data: Buffer): Promise { /* ... */ } + async decompress(data: Buffer): Promise { /* ... */ } + createCompressStream(): Transform { /* return a Transform stream */ } +} + +const codec = { name: 'lz4', handler: new MyLz4Handler() } +new MyPublisher(deps, { codec }) +new MyConsumer(deps, { codecs: [codec] }) // register custom codec on the consumer +``` + +Compressed messages are wrapped in a self-describing envelope `{ __mqtCodec: '', __mqtData: '', ...preserved fields }`. The message's identity/routing fields (`id`, `timestamp`, `type`, deduplication fields) are copied alongside `__mqtData` as plaintext — the same fields an offloaded-payload pointer preserves — so broker-side filtering (e.g. SNS body-scoped `FilterPolicy`) keeps working on them. Built-in codecs (e.g. zstd) are auto-detected on every consumer — no consumer option needed. For custom codecs, pass `codecs: [{ name, handler }]` to register them on the consumer. + +> **Roll out consumers before publishers:** auto-detection only works on a consumer running a library version that supports the codec (and, for custom codecs, with that codec registered). Upgrade all consumers of a queue first, then enable `codec` on publishers. + +#### Interaction with codec (compression) + +When both `codec` and `payloadStoreConfig` are set on a publisher, compression and offloading work together with a single compression pass: + +1. The message is compressed **once** at publish time. +2. The **codec envelope wire size** (base64-encoded compressed bytes + JSON framing) is compared against `messageSizeThreshold`. +3. If the envelope size exceeds the threshold, the raw compressed bytes are stored in the payload store. The codec name is written to `payloadRef.codec` so the consumer knows how to decompress after retrieval. +4. If the envelope size fits within the threshold, the message is sent inline as a self-describing codec envelope — S3 is never touched. + +`skipCompressionBelow` is honored here too: a message whose serialized JSON is below the threshold skips compression entirely and is offloaded (or sent inline) as plain JSON. + +> **Note:** for large payloads the compress-and-offload path streams the message through a temporary file under `os.tmpdir()` to avoid buffering the whole payload in memory. The temp file is always removed in a `finally` block. Environments with a read-only or unavailable temp directory (rare; AWS Lambda's `/tmp` is writable) cannot use the codec + payload-store combination. + +This means compression can prevent offloading entirely for messages that are large before compression but small after. + ## API Reference ### Types diff --git a/packages/core/lib/codec/codecHandler.ts b/packages/core/lib/codec/codecHandler.ts new file mode 100644 index 00000000..f1515267 --- /dev/null +++ b/packages/core/lib/codec/codecHandler.ts @@ -0,0 +1,129 @@ +import type { Transform } from 'node:stream' +import { promisify } from 'node:util' +import zlib from 'node:zlib' +import type { MessageCodecHandler, MessageCodecRegistration } from './messageCodec.ts' +import { MessageCodecEnum } from './messageCodec.ts' + +const ZSTD_UNSUPPORTED_MSG = + 'zlib.zstdCompress and zlib.zstdDecompress are not available in this Node.js version. ' + + 'Message compression requires Node.js >=22.15.0 or >=23.8.0.' + +/** + * Default upper bound on the decompressed size of a single message, in bytes (100 MiB). + * + * Protects consumers from decompression-bomb inputs: a tiny compressed envelope can + * otherwise expand to gigabytes of highly-repetitive data and exhaust process memory. + * 100 MiB is far above any realistic queue message (SQS/SNS cap bodies at 256 KiB, and + * even offloaded payloads are typically single-digit MiB) while still bounding the blast + * radius of a malicious or corrupt frame. Override via the {@link ZstdCodecHandler} + * constructor if you legitimately handle larger messages. + */ +export const DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024 + +// Resolved lazily — undefined on Node versions that lack zstd support. +// Keeping these lazy means importing core never throws on older Node; only an +// actual compress/decompress call does, and only when zstd is genuinely used. +const zstdCompress = + typeof zlib.zstdCompress === 'function' ? promisify(zlib.zstdCompress) : undefined +const zstdDecompress = + typeof zlib.zstdDecompress === 'function' ? promisify(zlib.zstdDecompress) : undefined + +export class ZstdCodecHandler implements MessageCodecHandler { + private readonly maxDecompressedBytes: number + + /** + * @param maxDecompressedBytes upper bound on a single decompressed message, in bytes. + * Defaults to {@link DEFAULT_MAX_DECOMPRESSED_BYTES} (100 MiB). Decompression of an + * input that would exceed this limit is rejected before the full payload is buffered. + */ + constructor(maxDecompressedBytes: number = DEFAULT_MAX_DECOMPRESSED_BYTES) { + this.maxDecompressedBytes = maxDecompressedBytes + } + + compress(data: Buffer): Promise { + if (!zstdCompress) throw new Error(ZSTD_UNSUPPORTED_MSG) + return zstdCompress(data) + } + + decompress(data: Buffer): Promise { + if (!zstdDecompress) throw new Error(ZSTD_UNSUPPORTED_MSG) + // maxOutputLength caps the decompressed size: zstdDecompress rejects with a + // RangeError once the limit is exceeded, guarding against decompression bombs. + return zstdDecompress(data, { maxOutputLength: this.maxDecompressedBytes }) + } + + createCompressStream(): Transform { + if (typeof zlib.createZstdCompress !== 'function') throw new Error(ZSTD_UNSUPPORTED_MSG) + return zlib.createZstdCompress() + } +} + +const ZSTD_HANDLER = new ZstdCodecHandler() + +/** + * Allowed characters for a custom codec name: ASCII letters, digits, hyphens, underscores. + * This keeps the name JSON-safe without escaping and makes it a recognisable identifier. + */ +const SAFE_CODEC_NAME_RE = /^[A-Za-z0-9_-]+$/ + +/** + * Returns the name string that will be written into the `__mqtCodec` field of every envelope. + * Throws for custom (object-form) registrations whose name contains characters that would + * produce invalid JSON when interpolated raw into the envelope string. + */ +export function getCodecName(codec: MessageCodecRegistration): string { + if (typeof codec === 'object') { + if (!SAFE_CODEC_NAME_RE.test(codec.name)) { + throw new Error( + `Invalid codec name "${codec.name}": only ASCII letters, digits, hyphens, and underscores are allowed`, + ) + } + return codec.name + } + return codec +} + +/** + * Resolves the {@link MessageCodecHandler} for the given codec registration. + * + * - String form (`MessageCodec`): returns the built-in handler for that codec. + * - Object form (`{ name, handler }`): returns the provided handler directly. + */ +export function resolveCodecHandler(codec: MessageCodecRegistration): MessageCodecHandler { + if (typeof codec === 'object') return codec.handler + if (codec === MessageCodecEnum.ZSTD) return ZSTD_HANDLER + throw new Error(`Unsupported codec: ${codec}`) +} + +/** + * Wraps an already-compressed buffer in a codec envelope string. + * Use this when you have pre-compressed bytes and want to avoid compressing twice. + * + * `preservedFields`, when provided, are emitted as plaintext siblings of the codec + * fields (`{ ...preserved, __mqtCodec, __mqtData }`). Publishers use this to keep + * identity/routing fields (`id`, `type`, …) visible on the wire so broker-side + * filtering (e.g. SNS body-scoped FilterPolicy) still works on compressed messages — + * the same fields an offloaded-payload pointer carries. The codec fields are written + * last, so a colliding preserved key can never corrupt the envelope; consumers ignore + * the preserved siblings and decode `__mqtData` only. + * + * Without `preservedFields` the fast path uses string concatenation instead of + * JSON.stringify, avoiding an intermediate object — the base64 string and the + * envelope string are the only two allocations on the inline path. + * + * `codecName` must already be a JSON-safe identifier (see {@link getCodecName}, + * which is enforced for every registration before it reaches this function). + */ +export function buildCodecEnvelope( + compressed: Buffer, + codecName: string, + preservedFields?: Record, +): string { + const data = compressed.toString('base64') + if (!preservedFields || Object.keys(preservedFields).length === 0) { + return `{"__mqtCodec":"${codecName}","__mqtData":"${data}"}` + } + // Preserved fields present: a single JSON.stringify handles all value escaping. + // Codec fields are listed last so they always win over any colliding preserved key. + return JSON.stringify({ ...preservedFields, __mqtCodec: codecName, __mqtData: data }) +} diff --git a/packages/core/lib/codec/messageCodec.ts b/packages/core/lib/codec/messageCodec.ts new file mode 100644 index 00000000..7d22c449 --- /dev/null +++ b/packages/core/lib/codec/messageCodec.ts @@ -0,0 +1,137 @@ +type ObjectValues = T[keyof T] + +/** + * Supported message compression codecs. + * + * Use the enum values instead of raw strings so that adding a new codec in + * the future is a single-place change and consumers benefit from IDE + * auto-complete. + * + * @example + * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD }) + */ +export const MessageCodecEnum = { + /** zstd compression via Node.js built-in `zlib` (requires Node.js >=22.15.0). */ + ZSTD: 'zstd', +} as const +export type MessageCodec = ObjectValues + +const CODEC_FIELD = '__mqtCodec' +const DATA_FIELD = '__mqtData' + +export type CodecEnvelope = { + // string (not MessageCodec) to accommodate user-supplied codec names. + [CODEC_FIELD]: string + [DATA_FIELD]: string +} + +/** + * Low-level interface for a compression codec. + * + * Implement this interface to plug in a custom compression algorithm. + * The built-in implementation (`ZstdCodecHandler`, exported from + * `@message-queue-toolkit/core`) uses Node.js built-in `zlib` zstd support. + * + * All three methods are required: + * - `compress` / `decompress` are used for the inline (non-offloaded) path on both sides. + * - `createCompressStream` is used by the streaming offload path to pipe serialized + * JSON directly through compression into the payload store without buffering the + * full payload in memory. + * + * Note the deliberate asymmetry: there is no `createDecompressStream`. Decompression is + * always buffer-based (`decompress`) because the consumer must `JSON.parse` the whole + * payload anyway — a streaming decompressor could not avoid materializing it. For an + * offloaded compressed payload this means the compressed bytes and the decompressed + * payload are both briefly resident in memory; bound the worst case with the + * `maxDecompressedBytes` argument of {@link ZstdCodecHandler} (or the equivalent in a + * custom handler). + */ +export interface MessageCodecHandler { + compress(data: Buffer): Promise + /** + * Decompresses a full compressed buffer. Buffer-based by design (see the interface + * note above); a custom implementation should cap the decompressed size to guard + * against decompression bombs. + */ + decompress(data: Buffer): Promise + /** Returns a Transform stream that compresses its input using this codec. */ + createCompressStream(): import('node:stream').Transform +} + +/** + * Passed to the `codec` option to select a compression codec. + * + * - **String form** (`MessageCodec`): selects one of the built-in codecs + * (e.g. `MessageCodecEnum.ZSTD`). + * - **Object form** (`{ name, handler }`): plugs in a custom + * `MessageCodecHandler` implementation under a user-chosen name. The name + * is written into the `__mqtCodec` field of every envelope so the consumer + * can identify and route to the correct handler. + * + * @example Built-in zstd + * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD }) + * + * @example Custom codec + * import { LZ4Handler } from './lz4Handler.ts' + * const codec = { name: 'lz4', handler: new LZ4Handler() } + * new MyPublisher(deps, { codec }) + * new MyConsumer(deps, { codecs: [codec] }) // register the same codec on the consumer + */ +export type MessageCodecRegistration = MessageCodec | { name: string; handler: MessageCodecHandler } + +/** + * Base64 pattern: groups of 4 chars from the alphabet, with at most 2 trailing `=` pads. + * An empty string (compressed payload of 0 bytes) is also valid. + * Exported so codec implementations can reuse it without duplicating the regex. + */ +export const BASE64_RE = + /^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{4})?$/ + +/** Built once at module load — avoids a fresh array allocation on every hot-path call. */ +export const KNOWN_CODECS: ReadonlySet = new Set(Object.values(MessageCodecEnum)) + +/** + * Structural check: returns true when `value` has the shape of a codec envelope — + * a non-empty string `__mqtCodec` and a base64 `__mqtData` — **regardless of whether + * the named codec is one this consumer can decode**. + * + * Detection is **presence-based**: extra sibling fields are allowed, because publishers + * copy identity/routing fields (`id`, `type`, …) alongside the codec fields so + * broker-side filtering (e.g. SNS body-scoped FilterPolicy) keeps working on compressed + * messages. This mirrors how an offloaded-payload pointer is recognised by its + * `payloadRef` shape, not by an exact object shape. + * + * Consumers use this (rather than {@link isCodecEnvelope}) so an envelope for an + * unregistered codec can be told apart from an ordinary message and surfaced as a + * misconfiguration instead of being validated as an incomplete skeleton. The cheap + * `in` checks run first, so a non-envelope value returns without allocating anything. + */ +export function hasCodecEnvelopeShape(value: unknown): value is CodecEnvelope { + if (typeof value !== 'object' || value === null) return false + const record = value as Record + return ( + CODEC_FIELD in record && + DATA_FIELD in record && + typeof record[CODEC_FIELD] === 'string' && + (record[CODEC_FIELD] as string).length > 0 && + typeof record[DATA_FIELD] === 'string' && + // Validate __mqtData is a properly-padded base64 string before handing it to the codec. + BASE64_RE.test(record[DATA_FIELD] as string) + ) +} + +/** + * Returns true when `value` is a codec envelope **for a codec in `knownCodecs`** — i.e. + * one this consumer can actually decode. Combines the structural + * {@link hasCodecEnvelopeShape} check with a codec-name lookup. + * + * Pass `knownCodecs` to restrict the match to the codecs your consumer is configured to + * handle. Defaults to the built-in codec set — backwards-compatible for consumers that + * don't configure a codec. + */ +export function isCodecEnvelope( + value: unknown, + knownCodecs: ReadonlySet = KNOWN_CODECS, +): value is CodecEnvelope { + return hasCodecEnvelopeShape(value) && knownCodecs.has((value as CodecEnvelope).__mqtCodec) +} diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index bb97ea60..d9d1b52b 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -1,3 +1,21 @@ +export { + buildCodecEnvelope, + DEFAULT_MAX_DECOMPRESSED_BYTES, + getCodecName, + resolveCodecHandler, + ZstdCodecHandler, +} from './codec/codecHandler.ts' +export { + BASE64_RE, + type CodecEnvelope, + hasCodecEnvelopeShape, + isCodecEnvelope, + KNOWN_CODECS, + type MessageCodec, + MessageCodecEnum, + type MessageCodecHandler, + type MessageCodecRegistration, +} from './codec/messageCodec.ts' export { DoNotProcessMessageError } from './errors/DoNotProcessError.ts' export { isMessageError, diff --git a/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts b/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts index fa62f183..807fc84e 100644 --- a/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts +++ b/packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts @@ -1,5 +1,16 @@ import { z } from 'zod/v4' +/** + * Upper bound for the declared size of an offloaded payload, in bytes (256 MiB). + * + * The size is read from an incoming (potentially untrusted) pointer and used to + * pre-allocate a buffer when streaming the payload back from the store. Capping it + * here means a malformed or hostile pointer claiming a multi-gigabyte size fails + * schema validation (and is routed to the DLQ) instead of triggering a huge + * allocation. 256 MiB is far above any realistic queue payload. + */ +export const MAX_OFFLOADED_PAYLOAD_SIZE = 256 * 1024 * 1024 + /** * Multi-store payload reference schema. * Contains information about where and how the payload was stored. @@ -10,7 +21,18 @@ export const PAYLOAD_REF_SCHEMA = z.object({ /** Name/identifier of the store where the payload is stored */ store: z.string().min(1), /** Size of the payload in bytes */ - size: z.number().int().positive(), + size: z.number().int().positive().max(MAX_OFFLOADED_PAYLOAD_SIZE), + /** + * Codec used to compress the stored payload. + * When set, the stored bytes are raw compressed binary (not base64 JSON). + * The consumer must decompress using this codec before parsing. + * + * Kept as a plain string to accommodate user-supplied codec names registered + * via the `{ name, handler }` form of the `codec` option — the set of valid + * names is not known statically. Handler resolution validates the name at + * use time and throws if no matching handler is found. + */ + codec: z.string().min(1).optional(), }) export type PayloadRef = z.output @@ -31,7 +53,7 @@ export const OFFLOADED_PAYLOAD_POINTER_PAYLOAD_SCHEMA = z payloadRef: PAYLOAD_REF_SCHEMA.optional(), // Legacy payload reference, preserved for backward compatibility. offloadedPayloadPointer: z.string().min(1).optional(), - offloadedPayloadSize: z.number().int().positive().optional(), + offloadedPayloadSize: z.number().int().positive().max(MAX_OFFLOADED_PAYLOAD_SIZE).optional(), }) // Pass-through allows to pass message ID, type, timestamp and message-deduplication-related fields that are using dynamic keys. .passthrough() diff --git a/packages/core/lib/payload-store/payloadStoreTypes.ts b/packages/core/lib/payload-store/payloadStoreTypes.ts index da3d95f9..ba9d56ab 100644 --- a/packages/core/lib/payload-store/payloadStoreTypes.ts +++ b/packages/core/lib/payload-store/payloadStoreTypes.ts @@ -30,7 +30,22 @@ export interface PayloadSerializer { * Use this when you have only one payload store. */ export type SinglePayloadStoreConfig = { - /** Threshold in bytes after which the payload should be stored in the store. */ + /** + * Wire-body size threshold in bytes. Messages whose wire body exceeds this value + * are offloaded to the store; smaller messages are sent inline. + * + * **What counts as "wire body size"** depends on whether a codec is active: + * - Without codec: the UTF-8 byte length of `JSON.stringify(message)`. + * - With codec: the byte length of the codec envelope + * (`{"__mqtCodec":"zstd","__mqtData":""}`) — i.e. the compressed + * payload after base64 encoding and JSON framing (~4/3 of the compressed size). + * + * Because codec reduces payload size before the threshold is applied, enabling + * codec effectively raises the bar for offloading: a 500 KB message that + * compresses to 100 KB will not trigger a 200 KB threshold. + * Size your threshold accordingly, or set it to the protocol's hard limit + * (e.g. `SQS_MESSAGE_MAX_SIZE`) to offload only when strictly necessary. + */ messageSizeThreshold: number /** The store to use for storing the payload. */ @@ -51,7 +66,22 @@ export type SinglePayloadStoreConfig = { * Use this when you need to support multiple payload stores (e.g., for migration). */ export type MultiPayloadStoreConfig = { - /** Threshold in bytes after which the payload should be stored in the store. */ + /** + * Wire-body size threshold in bytes. Messages whose wire body exceeds this value + * are offloaded to the store; smaller messages are sent inline. + * + * **What counts as "wire body size"** depends on whether a codec is active: + * - Without codec: the UTF-8 byte length of `JSON.stringify(message)`. + * - With codec: the byte length of the codec envelope + * (`{"__mqtCodec":"zstd","__mqtData":""}`) — i.e. the compressed + * payload after base64 encoding and JSON framing (~4/3 of the compressed size). + * + * Because codec reduces payload size before the threshold is applied, enabling + * codec effectively raises the bar for offloading: a 500 KB message that + * compresses to 100 KB will not trigger a 200 KB threshold. + * Size your threshold accordingly, or set it to the protocol's hard limit + * (e.g. `SQS_MESSAGE_MAX_SIZE`) to offload only when strictly necessary. + */ messageSizeThreshold: number /** Map of store identifiers to store instances. */ diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 40fa618b..b88c220f 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -1,3 +1,9 @@ +import { randomUUID } from 'node:crypto' +import * as fs from 'node:fs' +import * as os from 'node:os' +import * as path from 'node:path' +import { Readable } from 'node:stream' +import { pipeline } from 'node:stream/promises' import { types } from 'node:util' import { type CommonLogger, @@ -14,6 +20,8 @@ import { } from '@message-queue-toolkit/schemas' import { getProperty, setProperty } from 'dot-prop' import type { ZodSchema, ZodType } from 'zod/v4' +import { buildCodecEnvelope, getCodecName, resolveCodecHandler } from '../codec/codecHandler.ts' +import type { MessageCodecHandler, MessageCodecRegistration } from '../codec/messageCodec.ts' import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors.ts' import { type AcquireLockTimeoutError, @@ -36,6 +44,7 @@ import { import type { MultiPayloadStoreConfig, PayloadStore, + SerializedPayload, SinglePayloadStoreConfig, } from '../payload-store/payloadStoreTypes.ts' import { isDestroyable, isMultiPayloadStoreConfig } from '../payload-store/payloadStoreTypes.ts' @@ -48,7 +57,7 @@ import type { QueueOptions, } from '../types/queueOptionsTypes.ts' import { isRetryDateExceeded } from '../utils/dateUtils.ts' -import { streamWithKnownSizeToString } from '../utils/streamUtils.ts' +import { streamWithKnownSizeToBuffer, streamWithKnownSizeToString } from '../utils/streamUtils.ts' import { toDatePreprocessor } from '../utils/toDateProcessor.ts' import type { BarrierCallback, @@ -135,6 +144,15 @@ export abstract class AbstractQueueService< protected readonly messageDeduplicationConfig?: MessageDeduplicationConfig protected readonly messageMetricsManager?: MessageMetricsManager protected readonly _handlerSpy?: HandlerSpy + protected readonly skipCompressionBelow: number + protected readonly disableCodecAutoDetection: boolean + /** + * Codec handler resolved from the `codec` option, used by `prepareOutgoingPayload`. + * Undefined when no codec is configured (the common case). + */ + protected readonly resolvedCodecHandler?: MessageCodecHandler + /** Codec name matching `resolvedCodecHandler`, written into every codec envelope. */ + protected readonly resolvedCodecName?: string protected isInitted: boolean @@ -173,6 +191,24 @@ export abstract class AbstractQueueService< : undefined this.messageDeduplicationConfig = options.messageDeduplicationConfig + // Codec options live on role-specific option types (`codec`/`skipCompressionBelow` + // on publishers, `disableCodecAutoDetection` on consumers), so they are not part of + // the shared `QueueOptions` constraint. The base class handles both roles, hence the + // localized widening cast. + const codecOptions = options as Partial<{ + codec: MessageCodecRegistration + skipCompressionBelow: number + disableCodecAutoDetection: boolean + }> + // Clamp to a non-negative integer: a negative or fractional floor is meaningless + // (a negative value would behave like 0 — "always compress" — anyway). + this.skipCompressionBelow = Math.max(0, Math.trunc(codecOptions.skipCompressionBelow ?? 512)) + this.disableCodecAutoDetection = codecOptions.disableCodecAutoDetection ?? false + if (codecOptions.codec) { + this.resolvedCodecHandler = resolveCodecHandler(codecOptions.codec) + this.resolvedCodecName = getCodecName(codecOptions.codec) + } + this.logMessages = options.logMessages ?? false this._handlerSpy = resolveHandlerSpy(options) this.isInitted = false @@ -657,24 +693,86 @@ export abstract class AbstractQueueService< } /** - * Offload message payload to an external store if it exceeds the threshold. - * Returns a special type that contains a pointer to the offloaded payload or the original payload if it was not offloaded. - * Requires message size as only the implementation knows how to calculate it. + * Collects the identity/routing fields that must remain visible in plaintext on the + * wire even when the message body is compressed or offloaded: the configured id, + * timestamp and deduplication fields, plus the message `type` (resolved via + * `messageTypeResolver`, defaulting to the conventional top-level `type` path). + * + * Keeping these as plaintext siblings of an offload pointer / codec envelope is what + * lets routing and downstream filtering keep working. Without the `type` fallback, + * `messageTypeResolver` modes that don't specify a body path silently strip `type`, + * breaking downstream SNS subscription FilterPolicy filters (`FilterPolicyScope: + * 'MessageBody'`). Shared by `buildPointer` (offload) and the codec-envelope path + * (`prepareOutgoingPayload`) so both behave identically. + */ + protected collectPreservedFields(message: MessagePayloadSchemas): Record { + const fields: Record = { + // @ts-expect-error dynamic field access + [this.messageIdField]: message[this.messageIdField], + // @ts-expect-error dynamic field access + [this.messageTimestampField]: message[this.messageTimestampField], + // @ts-expect-error dynamic field access + [this.messageDeduplicationIdField]: message[this.messageDeduplicationIdField], + // @ts-expect-error dynamic field access + [this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField], + } + + const typePath = + this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver) + ? this.messageTypeResolver.messageTypePath + : 'type' + const typeValue = getProperty(message, typePath) + if (typeValue !== undefined) { + setProperty(fields, typePath, typeValue) + } + + return fields + } + + /** + * Builds an OffloadedPayloadPointerPayload from the given message and storage metadata. + * Identity/routing fields are preserved through offloading via {@link collectPreservedFields}. + */ + private buildPointer( + message: MessagePayloadSchemas, + payloadId: string, + storeName: string, + size: number, + codecName?: string, + ): OffloadedPayloadPointerPayload { + return { + payloadRef: { + id: payloadId, + store: storeName, + size, + ...(codecName ? { codec: codecName } : {}), + }, + offloadedPayloadPointer: payloadId, + offloadedPayloadSize: size, + ...this.collectPreservedFields(message), + } as OffloadedPayloadPointerPayload + } + + /** + * Offloads the message payload to the configured store if it exceeds the size threshold. + * Returns null if no offloading is needed (store not configured or message fits within threshold). * * For multi-store configuration, uses the configured outgoingStore. * For single-store configuration, uses the single store. * - * The returned payload includes both the new payloadRef format and legacy fields for backward compatibility. + * The returned pointer includes both the new payloadRef format and legacy fields for backward + * compatibility. The message type field is always preserved through offloading. */ - protected async offloadMessagePayloadIfNeeded( + protected async offloadPayload( message: MessagePayloadSchemas, messageSizeFn: () => number, - ): Promise { - if ( - !this.payloadStoreConfig || - messageSizeFn() <= this.payloadStoreConfig.messageSizeThreshold - ) { - return message + ): Promise { + if (!this.payloadStoreConfig) { + return null + } + + if (messageSizeFn() <= this.payloadStoreConfig.messageSizeThreshold) { + return null } const { store, storeName } = this.resolveOutgoingStore() @@ -689,45 +787,276 @@ export abstract class AbstractQueueService< } } - // Return message with both new and legacy formats for backward compatibility - const result: OffloadedPayloadPointerPayload = { - // Extended payload reference format - payloadRef: { - id: payloadId, - store: storeName, - size: serializedPayload.size, - }, - // Legacy format for backward compatibility - offloadedPayloadPointer: payloadId, - offloadedPayloadSize: serializedPayload.size, - // @ts-expect-error - [this.messageIdField]: message[this.messageIdField], - // @ts-expect-error - [this.messageTimestampField]: message[this.messageTimestampField], - // @ts-expect-error - [this.messageDeduplicationIdField]: message[this.messageDeduplicationIdField], - // @ts-expect-error - [this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField], + return this.buildPointer(message, payloadId, storeName, serializedPayload.size) + } + + /** + * Compresses (when codec is configured) or offloads (when a payload store is configured) + * the outgoing message. Shared by all publisher subclasses via the `resolvedCodecHandler` + * / `resolvedCodecName` fields resolved from the `codec` option in the base constructor. + * + * Returns: + * - `{ payload, preBuiltBody }` — `preBuiltBody` is a ready-to-send wire body string + * (codec envelope, or plain JSON when compression was skipped); `sendMessage` must + * use it as-is. + * - `{ payload }` — the payload is sent through the normal `JSON.stringify` path + * (no codec configured), optionally replaced by an offloaded-payload pointer. + */ + protected async prepareOutgoingPayload(message: MessagePayloadSchemas): Promise<{ + payload: MessagePayloadSchemas | OffloadedPayloadPointerPayload + preBuiltBody?: string + }> { + const handler = this.resolvedCodecHandler + const codecName = this.resolvedCodecName + + if (handler && codecName) { + return this.prepareCompressedOutgoingPayload(message, handler, codecName) } - // Preserve the message type field through offloading. We default to the conventional - // top-level `type` path so that routing/identity fields are handled consistently with - // `messageIdField`/`messageTimestampField`/etc., which have defaulted names ('id', - // 'timestamp', ...) and are always copied across when present. Without this fallback, - // `messageTypeResolver` modes that don't specify a body path (no resolver, `literal`, - // or `resolver`) silently strip `type` from the offloaded SNS body, which then breaks - // any downstream subscription whose FilterPolicy filters on `type` - // (FilterPolicyScope: 'MessageBody'). - const typePath = - this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver) - ? this.messageTypeResolver.messageTypePath - : 'type' - const typeValue = getProperty(message, typePath) - if (typeValue !== undefined) { - setProperty(result, typePath, typeValue) + return { + payload: + (await this.offloadPayload(message, () => this.calculateOutgoingMessageSize(message))) ?? + message, + } + } + + /** + * Codec branch of {@link prepareOutgoingPayload}, extracted so each method stays within + * the cognitive-complexity budget. + * + * With a payload store, serialization is delegated to {@link compressAndOffloadPayload}, + * which uses the store serializer's reported size to honor `skipCompressionBelow` — the + * message is never separately `JSON.stringify`'d, so a streaming serializer is not forced + * to fully materialize a large payload. Without a payload store, the wire body is bounded + * by the SQS/SNS 256 KB transport limit (and is a string parameter that cannot be + * streamed), so a single in-memory `JSON.stringify` is both necessary and safe. + */ + private async prepareCompressedOutgoingPayload( + message: MessagePayloadSchemas, + handler: MessageCodecHandler, + codecName: string, + ): Promise<{ + payload: MessagePayloadSchemas | OffloadedPayloadPointerPayload + preBuiltBody?: string + }> { + if (this.payloadStoreConfig) { + return this.compressAndOffloadPayload(message, handler, codecName) + } + + // No offload store: serialize exactly once. The result is the codec input and, when + // compression is skipped, the plain-JSON wire body. + const json = JSON.stringify(message) + + // Skip compression for messages below the configured floor — small payloads often + // grow rather than shrink when compressed. + if (Buffer.byteLength(json, 'utf8') < this.skipCompressionBelow) { + return { payload: message, preBuiltBody: json } } - return result + // Identity/routing fields are copied as plaintext envelope siblings so broker-side + // filtering (e.g. SNS body-scoped FilterPolicy) keeps working on compressed messages — + // mirroring how offloaded-payload pointers preserve these fields. + const compressed = await handler.compress(Buffer.from(json, 'utf8')) + return { + payload: message, + preBuiltBody: buildCodecEnvelope(compressed, codecName, this.collectPreservedFields(message)), + } + } + + /** + * Estimates the wire size in bytes of the codec envelope wrapping `compressedSize` + * compressed bytes. The envelope is `{...preservedFields,"__mqtCodec":"", + * "__mqtData":""}`: base64 expands the payload to `⌈N/3⌉×4`, the fixed JSON + * framing adds 32 chars plus the codec name length, and any preserved sibling fields + * add their serialized length. + * + * Note: this measures the envelope body only — transport-specific message attributes + * (small, and identical with or without codec) are not included. + */ + protected estimateCodecEnvelopeSize( + compressedSize: number, + codecName: string, + preservedFields?: Record, + ): number { + let size = Math.ceil(compressedSize / 3) * 4 + 32 + codecName.length + if (preservedFields) { + // Merged into the envelope, the preserved fields cost their serialized content + // minus the two outer braces ("{}"), plus one joining comma. An empty object + // ("{}", 2 bytes) contributes nothing. Measured in UTF-8 bytes throughout. + const serializedBytes = Buffer.byteLength(JSON.stringify(preservedFields), 'utf8') + if (serializedBytes > 2) { + size += serializedBytes - 1 + } + } + return size + } + + /** + * Returns the wire size of the outgoing message in bytes, used by `offloadPayload` to decide + * whether the payload exceeds `messageSizeThreshold`. + * + * Overridden by publisher subclasses (SQS, SNS) to call their transport-specific utility. + * Not called on the consumer path; consumers do not override this method. + */ + protected calculateOutgoingMessageSize(_message: MessagePayloadSchemas): number { + /* c8 ignore next */ + throw new Error('calculateOutgoingMessageSize must be implemented by the publisher subclass') + } + + /** + * Store-path handler for codec publishers (both `codec` and `payloadStoreConfig` set). + * + * Serializes the message **once**, through the payload store's serializer (which may + * stream large payloads). `skipCompressionBelow` is evaluated against the size the + * serializer reports — there is no separate `JSON.stringify`, so a streaming serializer + * is never forced to fully materialize the payload just to evaluate the floor. + * + * - **Below `skipCompressionBelow`:** compression is skipped — the payload is sent inline + * as plain JSON, or offloaded uncompressed if it still exceeds `messageSizeThreshold` + * (possible only when the threshold is configured below the floor). + * - **Otherwise:** the payload is compressed and either inlined as a codec envelope or + * offloaded as raw compressed bytes (see {@link compressSerializedPayload}). + * + * Returns the same `{ payload, preBuiltBody? }` shape as {@link prepareOutgoingPayload}. + */ + protected async compressAndOffloadPayload( + message: MessagePayloadSchemas, + handler: MessageCodecHandler, + codecName: string, + ): Promise<{ + payload: MessagePayloadSchemas | OffloadedPayloadPointerPayload + preBuiltBody?: string + }> { + if (!this.payloadStoreConfig) { + throw new Error('Payload store is not configured') + } + const threshold = this.payloadStoreConfig.messageSizeThreshold + + // Serialize once. `serialized.size` is the authoritative wire size — used to honor + // `skipCompressionBelow` without a redundant `JSON.stringify` of the whole message. + const serialized = await this.payloadStoreConfig.serializer.serialize(message) + + try { + // Below the compression floor: skip compression entirely. The payload is offloaded + // uncompressed if it still exceeds the threshold (only possible when the threshold + // is set below the floor), otherwise sent inline as plain JSON. + if (serialized.size < this.skipCompressionBelow) { + if (serialized.size > threshold) { + const { store, storeName } = this.resolveOutgoingStore() + const payloadId = await store.storePayload(serialized) + return { payload: this.buildPointer(message, payloadId, storeName, serialized.size) } + } + return { + payload: message, + preBuiltBody: + typeof serialized.value === 'string' + ? serialized.value + : await streamWithKnownSizeToString(serialized.value, serialized.size), + } + } + + return await this.compressSerializedPayload( + message, + serialized, + handler, + codecName, + threshold, + ) + } finally { + if (isDestroyable(serialized)) { + await serialized.destroy() + } + } + } + + /** + * Compresses an already-serialized payload and decides — against the **codec envelope + * wire size** (base64-encoded compressed bytes + JSON framing), not the raw compressed + * byte count — whether to send it inline as a codec envelope or offload the compressed + * bytes. Compression does not always shrink data, so the base64 envelope can exceed + * `messageSizeThreshold` even when the raw payload did not. + * + * - **In-memory fast path:** a string payload below `messageSizeThreshold` is compressed + * directly into a Buffer — no temp file is created, no disk I/O occurs. + * - **Streaming path:** a stream payload (or large string) is piped through the codec + * Transform into a temp file, so no full-payload buffer is materialized. + * + * Split out of {@link compressAndOffloadPayload} for the cognitive-complexity budget. + */ + private async compressSerializedPayload( + message: MessagePayloadSchemas, + serialized: SerializedPayload, + handler: MessageCodecHandler, + codecName: string, + threshold: number, + ): Promise<{ + payload: MessagePayloadSchemas | OffloadedPayloadPointerPayload + preBuiltBody?: string + }> { + // Identity/routing fields are copied as plaintext envelope siblings so broker-side + // filtering (e.g. SNS body-scoped FilterPolicy) keeps working on compressed messages. + const preservedFields = this.collectPreservedFields(message) + + // In-memory fast path: avoid disk I/O entirely for small string payloads. + if (typeof serialized.value === 'string' && serialized.size < threshold) { + const compressed = await handler.compress(Buffer.from(serialized.value, 'utf8')) + if ( + this.estimateCodecEnvelopeSize(compressed.length, codecName, preservedFields) > threshold + ) { + const { store, storeName } = this.resolveOutgoingStore() + const payloadId = await store.storePayload({ + value: Readable.from(compressed), + size: compressed.length, + }) + return { + payload: this.buildPointer(message, payloadId, storeName, compressed.length, codecName), + } + } + return { + payload: message, + preBuiltBody: buildCodecEnvelope(compressed, codecName, preservedFields), + } + } + + // Streaming pipeline: serializer output → codec transform → temp file. + // No full-payload buffer is materialised; each codec supplies its own Transform. + const tmpPath = path.join(os.tmpdir(), randomUUID()) + try { + await pipeline( + typeof serialized.value === 'string' ? Readable.from(serialized.value) : serialized.value, + handler.createCompressStream(), + fs.createWriteStream(tmpPath), + ) + + const compressedSize = (await fs.promises.stat(tmpPath)).size + + if (this.estimateCodecEnvelopeSize(compressedSize, codecName, preservedFields) > threshold) { + const { store, storeName } = this.resolveOutgoingStore() + const payloadId = await store.storePayload({ + value: fs.createReadStream(tmpPath), + size: compressedSize, + }) + return { + payload: this.buildPointer(message, payloadId, storeName, compressedSize, codecName), + } + } + + // Compressed payload fits inline — wrap the buffer in a codec envelope. + return { + payload: message, + preBuiltBody: buildCodecEnvelope( + await fs.promises.readFile(tmpPath), + codecName, + preservedFields, + ), + } + } finally { + try { + await fs.promises.unlink(tmpPath) + } catch { + // ignore cleanup errors + } + } } /** @@ -735,9 +1064,17 @@ export abstract class AbstractQueueService< * Returns the original payload or an error if the payload was not found or could not be parsed. * * Supports both new multi-store format (payloadRef) and legacy format (offloadedPayloadPointer). + * + * When `resolveDecompressor` is provided and the pointer's `payloadRef.codec` is set, the + * fetched bytes are treated as raw compressed binary and decompressed before JSON parsing. + * `resolveDecompressor` is invoked *outside* the catch block: if it throws (e.g. the codec + * named in the pointer is not registered on this consumer — a deployment misconfiguration, + * not a bad message), the throw propagates as a retriable error so the message stays on the + * queue instead of being silently routed to the DLQ. */ protected async retrieveOffloadedMessagePayload( maybeOffloadedPayloadPointerPayload: unknown, + resolveDecompressor?: (codec: string) => (data: Buffer) => Promise, ): Promise> { if (!this.payloadStoreConfig) { return { @@ -787,6 +1124,34 @@ export abstract class AbstractQueueService< } } + const codec = parsedPayload.payloadRef?.codec + if (codec && resolveDecompressor) { + // Resolve the decompressor OUTSIDE the try/catch. An unknown/unregistered codec is a + // consumer misconfiguration, not a poison message — letting it throw here makes it a + // retriable error (the message stays on the queue and becomes visible again after the + // visibility timeout) instead of being silently routed to the DLQ. + const decompress = resolveDecompressor(codec) + // The stream read is likewise kept outside the try/catch so transient retrieval errors + // (truncated S3 stream, network blip) propagate as thrown exceptions and are retried. + // Only deterministic failures (corrupt compressed bytes, invalid JSON after + // decompression) are caught and returned as { error }, which the consumer treats as a + // poison message and routes to the DLQ. This mirrors the non-codec path below. + const compressedBuffer = await streamWithKnownSizeToBuffer( + serializedOffloadedPayloadReadable, + payloadSize, + ) + try { + const decompressed = await decompress(compressedBuffer) + return { result: JSON.parse(decompressed.toString('utf8')) } + } catch (e) { + return { + error: new Error(`Failed to decompress offloaded payload with codec "${codec}"`, { + cause: e, + }), + } + } + } + const serializedOffloadedPayloadString = await streamWithKnownSizeToString( serializedOffloadedPayloadReadable, payloadSize, diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index d8f26445..d06ce9e1 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -1,5 +1,6 @@ import type { CommonLogger, ErrorReporter, ErrorResolver } from '@lokalise/node-core' import type { ZodSchema } from 'zod/v4' +import type { MessageCodecRegistration } from '../codec/messageCodec.ts' import type { MessageDeduplicationConfig } from '../message-deduplication/messageDeduplicationTypes.ts' import type { PayloadStoreConfig } from '../payload-store/payloadStoreTypes.ts' import type { MessageHandlerConfig } from '../queues/HandlerContainer.ts' @@ -292,6 +293,48 @@ export type QueuePublisherOptions< > = QueueOptions & { messageSchemas: readonly ZodSchema[] enablePublisherDeduplication?: boolean + /** + * Compression codec applied to outgoing message bodies. + * + * Every outgoing message body is compressed and wrapped in a self-describing + * envelope `{ __mqtCodec: 'zstd', __mqtData: '' }`. Consumers auto-detect + * the envelope and decompress transparently — no consumer-side option is needed + * for built-in codecs, so mixed (compressed/uncompressed) queues work seamlessly. + * + * Only the SQS and SNS adapters support compression; AMQP and Pub/Sub publishers + * throw at construction time if a codec is supplied. + * + * Uses Node.js built-in `zlib` zstd support — **requires Node.js >=22.15.0** (or >=23.8.0). + * + * @example + * import { MessageCodecEnum } from '@message-queue-toolkit/core' + * + * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD }) + */ + codec?: MessageCodecRegistration + /** + * Minimum serialized size in bytes a message must reach before compression is applied. + * Only meaningful when `codec` is set. Defaults to `512`. + * + * Small messages often expand rather than shrink when compressed due to algorithm + * framing overhead. When the UTF-8 JSON representation of a message is strictly + * smaller than this value, the message is sent as plain JSON instead of a codec + * envelope, avoiding the compression cost with no loss of correctness. + * + * This floor is honored regardless of whether `payloadStoreConfig` is also set: + * a message below the threshold is sent (or offloaded) as plain JSON without + * compression. + * + * Set to `0` to compress every message regardless of size. + * + * @example + * // Compress only messages ≥ 1 KB + * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD, skipCompressionBelow: 1024 }) + * + * // Always compress (disable the floor) + * new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD, skipCompressionBelow: 0 }) + */ + skipCompressionBelow?: number } export type DeadLetterQueueOptions< diff --git a/packages/core/lib/utils/streamUtils.ts b/packages/core/lib/utils/streamUtils.ts index 7ea2155f..b94c79f7 100644 --- a/packages/core/lib/utils/streamUtils.ts +++ b/packages/core/lib/utils/streamUtils.ts @@ -1,6 +1,6 @@ import type { Readable } from 'node:stream' -export async function streamWithKnownSizeToString(stream: Readable, size: number): Promise { +export async function streamWithKnownSizeToBuffer(stream: Readable, size: number): Promise { const buffer = Buffer.alloc(size) let offset = 0 @@ -14,5 +14,12 @@ export async function streamWithKnownSizeToString(stream: Readable, size: number offset += chunkBuffer.length } - return buffer.toString('utf8', 0, offset) + // Copy only when the stream delivered fewer bytes than expected so the + // full backing allocation is not retained via a shared-memory view. + return offset === size ? buffer : Buffer.from(buffer.subarray(0, offset)) +} + +export async function streamWithKnownSizeToString(stream: Readable, size: number): Promise { + const buffer = await streamWithKnownSizeToBuffer(stream, size) + return buffer.toString('utf8') } diff --git a/packages/core/package.json b/packages/core/package.json index 041db97b..bd8aec0d 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/core", - "version": "25.5.0", + "version": "26.0.0", "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently", "maintainers": [ diff --git a/packages/core/test/codec/codecHandler.spec.ts b/packages/core/test/codec/codecHandler.spec.ts new file mode 100644 index 00000000..ad09b136 --- /dev/null +++ b/packages/core/test/codec/codecHandler.spec.ts @@ -0,0 +1,59 @@ +import { describe, expect, it } from 'vitest' + +import { buildCodecEnvelope } from '../../lib/codec/codecHandler.ts' +import { hasCodecEnvelopeShape, isCodecEnvelope } from '../../lib/codec/messageCodec.ts' + +const COMPRESSED = Buffer.from('compressed-bytes') +const COMPRESSED_B64 = COMPRESSED.toString('base64') + +describe('buildCodecEnvelope', () => { + it('emits a two-field envelope when no preserved fields are given', () => { + expect(JSON.parse(buildCodecEnvelope(COMPRESSED, 'zstd'))).toEqual({ + __mqtCodec: 'zstd', + __mqtData: COMPRESSED_B64, + }) + }) + + it('emits a two-field envelope when preserved fields are an empty object', () => { + expect(JSON.parse(buildCodecEnvelope(COMPRESSED, 'zstd', {}))).toEqual({ + __mqtCodec: 'zstd', + __mqtData: COMPRESSED_B64, + }) + }) + + it('emits preserved fields as plaintext siblings of the codec fields', () => { + const envelope = JSON.parse( + buildCodecEnvelope(COMPRESSED, 'zstd', { id: 'm-1', type: 'permissions.add' }), + ) + expect(envelope).toEqual({ + id: 'm-1', + type: 'permissions.add', + __mqtCodec: 'zstd', + __mqtData: COMPRESSED_B64, + }) + }) + + it('escapes preserved field values that contain JSON metacharacters', () => { + const envelope = JSON.parse( + buildCodecEnvelope(COMPRESSED, 'zstd', { id: 'quote " and \\ backslash' }), + ) + expect(envelope.id).toBe('quote " and \\ backslash') + expect(envelope.__mqtData).toBe(COMPRESSED_B64) + }) + + it('codec fields always win over a colliding preserved field name', () => { + const envelope = JSON.parse( + buildCodecEnvelope(COMPRESSED, 'zstd', { __mqtData: 'tampered', __mqtCodec: 'tampered' }), + ) + expect(envelope.__mqtData).toBe(COMPRESSED_B64) + expect(envelope.__mqtCodec).toBe('zstd') + }) + + it('produces an envelope still recognised by isCodecEnvelope and hasCodecEnvelopeShape', () => { + const envelope = JSON.parse( + buildCodecEnvelope(COMPRESSED, 'zstd', { id: 'm-1', type: 'permissions.add' }), + ) + expect(isCodecEnvelope(envelope)).toBe(true) + expect(hasCodecEnvelopeShape(envelope)).toBe(true) + }) +}) diff --git a/packages/core/test/codec/messageCodec.spec.ts b/packages/core/test/codec/messageCodec.spec.ts new file mode 100644 index 00000000..958bec0c --- /dev/null +++ b/packages/core/test/codec/messageCodec.spec.ts @@ -0,0 +1,158 @@ +import { describe, expect, it } from 'vitest' + +import { + hasCodecEnvelopeShape, + isCodecEnvelope, + MessageCodecEnum, +} from '../../lib/codec/messageCodec.ts' + +const VALID_BASE64 = Buffer.from('hello compressed world').toString('base64') + +describe('isCodecEnvelope — custom knownCodecs', () => { + const CUSTOM_CODECS = new Set(['lz4', 'brotli']) + + it('returns true when the envelope codec is in the supplied knownCodecs set', () => { + expect(isCodecEnvelope({ __mqtCodec: 'lz4', __mqtData: VALID_BASE64 }, CUSTOM_CODECS)).toBe( + true, + ) + }) + + it('returns false for a built-in codec when it is not in the supplied knownCodecs set', () => { + // zstd is valid with default knownCodecs but must be rejected when not in the custom set + expect( + isCodecEnvelope( + { __mqtCodec: MessageCodecEnum.ZSTD, __mqtData: VALID_BASE64 }, + CUSTOM_CODECS, + ), + ).toBe(false) + }) + + it('returns false for a codec that is in neither the default nor the supplied set', () => { + expect(isCodecEnvelope({ __mqtCodec: 'gzip', __mqtData: VALID_BASE64 }, CUSTOM_CODECS)).toBe( + false, + ) + }) +}) + +describe('isCodecEnvelope', () => { + describe('valid envelopes', () => { + it('returns true for a well-formed envelope', () => { + expect(isCodecEnvelope({ __mqtCodec: MessageCodecEnum.ZSTD, __mqtData: VALID_BASE64 })).toBe( + true, + ) + }) + + it('returns true for an envelope with empty base64 data (zero-byte payload)', () => { + expect(isCodecEnvelope({ __mqtCodec: MessageCodecEnum.ZSTD, __mqtData: '' })).toBe(true) + }) + }) + + describe('preserved sibling fields — detection is presence-based', () => { + it('returns true when the envelope carries preserved sibling fields (id, type, …)', () => { + // Publishers copy identity/routing fields alongside the codec fields so broker-side + // filtering (e.g. SNS body-scoped FilterPolicy) keeps working on compressed + // messages. Detection is presence-based and must accept these extra siblings, + // mirroring how offloaded-payload pointers are detected by marker-field presence. + expect( + isCodecEnvelope({ + __mqtCodec: MessageCodecEnum.ZSTD, + __mqtData: VALID_BASE64, + id: 'real-message', + type: 'permissions.add', + timestamp: '2026-05-22T00:00:00.000Z', + }), + ).toBe(true) + }) + + it('returns false when only __mqtCodec is present (no __mqtData)', () => { + expect(isCodecEnvelope({ __mqtCodec: MessageCodecEnum.ZSTD })).toBe(false) + }) + + it('returns false when only __mqtData is present (no __mqtCodec)', () => { + expect(isCodecEnvelope({ __mqtData: VALID_BASE64 })).toBe(false) + }) + }) + + describe('invalid __mqtCodec values', () => { + it('returns false for an unknown codec name', () => { + expect(isCodecEnvelope({ __mqtCodec: 'gzip', __mqtData: VALID_BASE64 })).toBe(false) + }) + + it('returns false when __mqtCodec is not a string', () => { + expect(isCodecEnvelope({ __mqtCodec: 42, __mqtData: VALID_BASE64 })).toBe(false) + }) + }) + + describe('invalid __mqtData values — base64 validation', () => { + it('returns false for a non-base64 string', () => { + expect( + isCodecEnvelope({ __mqtCodec: MessageCodecEnum.ZSTD, __mqtData: 'not base64!!!' }), + ).toBe(false) + }) + + it('returns false when __mqtData is not a string', () => { + expect(isCodecEnvelope({ __mqtCodec: MessageCodecEnum.ZSTD, __mqtData: 123 })).toBe(false) + }) + + it('returns false for a base64 string with incorrect padding', () => { + // Valid base64 chars but wrong padding length + expect(isCodecEnvelope({ __mqtCodec: MessageCodecEnum.ZSTD, __mqtData: 'abc' })).toBe(false) + }) + }) + + describe('non-object inputs', () => { + it('returns false for null', () => { + expect(isCodecEnvelope(null)).toBe(false) + }) + + it('returns false for a string', () => { + expect(isCodecEnvelope('{"__mqtCodec":"zstd","__mqtData":""}')).toBe(false) + }) + + it('returns false for a number', () => { + expect(isCodecEnvelope(42)).toBe(false) + }) + + it('returns false for an empty object', () => { + expect(isCodecEnvelope({})).toBe(false) + }) + }) +}) + +describe('hasCodecEnvelopeShape', () => { + it('returns true for an envelope naming a codec unknown to this consumer', () => { + // Structural check — unlike isCodecEnvelope it does not consult a knownCodecs set, + // so an envelope for an unregistered codec is still recognised (and can be surfaced + // as a misconfiguration rather than processed as an incomplete message). + expect(hasCodecEnvelopeShape({ __mqtCodec: 'lz4', __mqtData: VALID_BASE64 })).toBe(true) + }) + + it('returns true for an envelope carrying preserved sibling fields', () => { + expect( + hasCodecEnvelopeShape({ + __mqtCodec: MessageCodecEnum.ZSTD, + __mqtData: VALID_BASE64, + id: 'm-1', + type: 'permissions.add', + }), + ).toBe(true) + }) + + it('returns false when __mqtCodec is an empty string', () => { + expect(hasCodecEnvelopeShape({ __mqtCodec: '', __mqtData: VALID_BASE64 })).toBe(false) + }) + + it('returns false when a marker field is missing', () => { + expect(hasCodecEnvelopeShape({ __mqtCodec: 'lz4' })).toBe(false) + expect(hasCodecEnvelopeShape({ __mqtData: VALID_BASE64 })).toBe(false) + }) + + it('returns false when __mqtData is not valid base64', () => { + expect(hasCodecEnvelopeShape({ __mqtCodec: 'lz4', __mqtData: 'not base64!!!' })).toBe(false) + }) + + it('returns false for non-object inputs', () => { + expect(hasCodecEnvelopeShape(null)).toBe(false) + expect(hasCodecEnvelopeShape('a string')).toBe(false) + }) +}) diff --git a/packages/core/test/queues/AbstractQueueService.offload.spec.ts b/packages/core/test/queues/AbstractQueueService.offload.spec.ts index 2d6142ac..32167f30 100644 --- a/packages/core/test/queues/AbstractQueueService.offload.spec.ts +++ b/packages/core/test/queues/AbstractQueueService.offload.spec.ts @@ -1,5 +1,5 @@ /** - * Regression tests for `AbstractQueueService.offloadMessagePayloadIfNeeded`. + * Regression tests for `AbstractQueueService.offloadPayload`. * * Identity fields (`messageIdField`, `messageTimestampField`, `messageDeduplicationIdField`, * `messageDeduplicationOptionsField`) all have defaulted names ('id', 'timestamp', ...) and @@ -58,7 +58,7 @@ class TestQueueService extends AbstractQueueService< // Expose protected method for direct testing. public callOffload(message: TestMessage, sizeFn: () => number) { - return this.offloadMessagePayloadIfNeeded(message, sizeFn) + return this.offloadPayload(message, sizeFn) } } @@ -102,7 +102,7 @@ const baseMessage: TestMessage = { payload: { large: 'data' }, } -describe('AbstractQueueService.offloadMessagePayloadIfNeeded — `type` preservation', () => { +describe('AbstractQueueService.offloadPayload — `type` preservation', () => { it('preserves `type` when no messageTypeResolver is configured', async () => { const svc = buildService(undefined) const result = (await svc.callOffload( diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts index 5927afa0..aa96a474 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts @@ -42,6 +42,12 @@ export abstract class AbstractPubSubPublisher MessagePayloadType >, ) { + if (options.codec) { + throw new Error( + 'codec is not supported by AbstractPubSubPublisher. Remove the codec option or use an SQS/SNS publisher.', + ) + } + super(dependencies, options) this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options) @@ -69,11 +75,12 @@ export abstract class AbstractPubSubPublisher const parsedMessage = messageSchemaResult.result.parse(message) message = this.updateInternalProperties(message) - const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => { - // Calculate message size for PubSub - const messageData = Buffer.from(JSON.stringify(message)) - return messageData.length - }) + const maybeOffloadedPayloadMessage = + (await this.offloadPayload(message, () => { + // Calculate message size for PubSub + const messageData = Buffer.from(JSON.stringify(message)) + return messageData.length + })) ?? message if ( this.isDeduplicationEnabledForMessage(parsedMessage) && diff --git a/packages/gcp-pubsub/package.json b/packages/gcp-pubsub/package.json index e731daef..6478656e 100644 --- a/packages/gcp-pubsub/package.json +++ b/packages/gcp-pubsub/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/gcp-pubsub", - "version": "2.2.0", + "version": "3.0.0", "license": "MIT", "description": "Google Cloud Pub/Sub adapter for message-queue-toolkit", "maintainers": [ @@ -32,7 +32,7 @@ }, "peerDependencies": { "@google-cloud/pubsub": "^5.2.0", - "@message-queue-toolkit/core": ">=25.0.0", + "@message-queue-toolkit/core": ">=26.0.0", "zod": ">=3.25.76 <5.0.0" }, "devDependencies": { @@ -40,7 +40,7 @@ "@biomejs/biome": "^2.3.8", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", - "@message-queue-toolkit/core": "*", + "@message-queue-toolkit/core": "workspace:*", "@message-queue-toolkit/gcs-payload-store": "*", "@message-queue-toolkit/redis-message-deduplication-store": "*", "@message-queue-toolkit/schemas": "*", diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts index 8e99e475..29d3c873 100644 --- a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts @@ -23,6 +23,15 @@ describe('PubSubPermissionPublisher', () => { await diContainer.dispose() }) + describe('constructor', () => { + it('throws when codec option is set (codec is not supported by Pub/Sub publishers)', () => { + // The guard fires before super() so real dependencies are not required. + expect(() => new PubSubPermissionPublisher({} as any, { codec: 'zstd' } as any)).toThrow( + 'codec is not supported by AbstractPubSubPublisher', + ) + }) + }) + describe('init', () => { it('creates a new topic', async () => { const newPublisher = diContainer.cradle.permissionPublisher diff --git a/packages/sns/README.md b/packages/sns/README.md index 3eb9cb2e..44872985 100644 --- a/packages/sns/README.md +++ b/packages/sns/README.md @@ -58,6 +58,7 @@ npm install @message-queue-toolkit/sns @message-queue-toolkit/sqs @message-queue - ✅ **Handler spies** for testing - ✅ **Pre-handlers and barriers** for complex message processing - ✅ **Cross-account and cross-region publishing** +- ✅ **Message compression** with zstd via Node.js built-in `zlib` (Node.js >=22.15.0 required) ## Core Concepts @@ -683,6 +684,10 @@ await consumer.start() // Optional - Payload Offloading (same as SQS) payloadStoreConfig: { /* ... */ }, + // Optional - Compression (Node.js >=22.15.0 required) + codec: MessageCodecEnum.ZSTD, // Compress every outgoing message with zstd + skipCompressionBelow: 512, // Skip compression for messages smaller than 512 bytes (default: 512) + // Optional - Deletion deletionConfig: { /* ... */ }, } @@ -1005,6 +1010,7 @@ SNS consumers inherit all advanced features from SQS consumers. See the SQS READ - **[Message Retry Logic](../sqs/README.md#message-retry-logic)** - Exponential backoff and retry limits - **[Message Deduplication](../sqs/README.md#message-deduplication)** - Publisher and consumer-level deduplication - **[Payload Offloading](../sqs/README.md#payload-offloading)** - S3 storage for large messages +- **[Message Compression](../sqs/README.md#message-compression)** - zstd compression via Node.js built-in `zlib` - **[Message Handlers](../sqs/README.md#message-handlers)** - Type-safe handler configuration - **[Pre-handlers and Barriers](../sqs/README.md#pre-handlers-and-barriers)** - Middleware and message dependencies - **[Handler Spies](../sqs/README.md#handler-spies)** - Testing async message flows diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index f6c14ad4..910954fd 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -125,19 +125,34 @@ export abstract class AbstractSnsPublisher const topicName = this.locatorConfig?.topicName ?? this.creationConfig?.topic?.Name ?? 'unknown' + // Fast read-only pre-check: skip compression/offload for messages already known to + // be duplicates. This does NOT persist a dedup key, so a transient failure in the + // expensive work below leaves no key behind and the publish stays safely retriable. + if (await this.isMessageDuplicated(parsedMessage, DeduplicationRequesterEnum.Publisher)) { + this.handleMessageProcessed({ + message: parsedMessage, + processingResult: { status: 'published', skippedAsDuplicate: true }, + messageProcessingStartTimestamp, + queueName: topicName, + }) + return + } + const updatedMessage = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading // (offloaded payload won't have user fields needed for messageGroupIdField) const resolvedOptions = this.resolveFifoOptions(updatedMessage, options) - const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded( - updatedMessage, - () => calculateOutgoingMessageSize(updatedMessage), - ) + const { payload, preBuiltBody } = await this.prepareOutgoingPayload(updatedMessage) + // Persist the dedup key only now — immediately before send — so the window in which + // a stored key plus a failed send could drop the message on retry stays as small as + // possible and no longer spans compression/offload (a transient failure there leaves + // no key behind). The pre-check above already skipped the expensive work for the + // common duplicate case; this check additionally closes the race where a concurrent + // publish stored the key in the meantime. if ( - this.isDeduplicationEnabledForMessage(parsedMessage) && (await this.deduplicateMessage(parsedMessage, DeduplicationRequesterEnum.Publisher)) .isDuplicated ) { @@ -150,7 +165,7 @@ export abstract class AbstractSnsPublisher return } - await this.sendMessage(maybeOffloadedPayloadMessage, resolvedOptions) + await this.sendMessage(payload, resolvedOptions, preBuiltBody) this.handleMessageProcessed({ message: parsedMessage, @@ -206,13 +221,21 @@ export abstract class AbstractSnsPublisher return this.isDeduplicationEnabled && super.isDeduplicationEnabledForMessage(message) } + protected override calculateOutgoingMessageSize(message: MessagePayloadType): number { + return calculateOutgoingMessageSize(message) + } + protected async sendMessage( payload: MessagePayloadType | OffloadedPayloadPointerPayload, options: SNSMessageOptions, + preBuiltBody?: string, ): Promise { const attributes = resolveOutgoingMessageAttributes(payload) + // preBuiltBody is set when codec is active and the payload was not offloaded — + // it contains the already-compressed codec envelope, so we skip re-serialization. + const body = preBuiltBody ?? JSON.stringify(payload) const command = new PublishCommand({ - Message: JSON.stringify(payload), + Message: body, MessageAttributes: attributes, TopicArn: this.topicArn, ...options, diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index 9e1113e1..80eeb0f3 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -1,5 +1,11 @@ import type { SNSClient } from '@aws-sdk/client-sns' import type { STSClient } from '@aws-sdk/client-sts' +import type { Either } from '@lokalise/node-core' +import type { + MessageInvalidFormatError, + MessageValidationError, + ResolvedMessage, +} from '@message-queue-toolkit/core' import type { SQSConsumerDependencies, SQSConsumerOptions, @@ -35,6 +41,10 @@ export type SNSSQSConsumerOptions< SNSSQSCreationConfig, SNSSQSQueueLocatorType > & + // Intersected after SQSConsumerOptions (rather than wrapping it) so its discriminated + // union (fifoQueue: true | false) is preserved and Extract<…, {fifoQueue:true}> works. + // Codec consumer options (`codecs`, `disableCodecAutoDetection`) come from + // SQSConsumerOptions; SNSOptions carries no codec fields. SNSOptions & { subscriptionConfig?: SNSSubscriptionOptions } @@ -201,7 +211,9 @@ export abstract class AbstractSnsSqsConsumer< await this.startConsumers() } - protected override resolveMessage(message: SQSMessage) { + protected override resolveMessage( + message: SQSMessage, + ): Either { const result = readSnsMessage(message, this.errorResolver) if (result.result) { return result diff --git a/packages/sns/package.json b/packages/sns/package.json index 305c0d09..37224df1 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "24.7.0", + "version": "25.0.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", @@ -38,7 +38,7 @@ "@aws-sdk/client-sns": "^3.632.0", "@aws-sdk/client-sqs": "^3.1034.0", "@aws-sdk/client-sts": "^3.632.0", - "@message-queue-toolkit/core": ">=24.0.0", + "@message-queue-toolkit/core": ">=26.0.0", "@message-queue-toolkit/schemas": ">=7.0.0", "@message-queue-toolkit/sqs": ">=23.0.0", "zod": ">=3.25.76 <5.0.0" @@ -50,9 +50,9 @@ "@biomejs/biome": "^2.3.6", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", - "@message-queue-toolkit/core": "*", - "@message-queue-toolkit/redis-message-deduplication-store": "*", - "@message-queue-toolkit/s3-payload-store": "*", + "@message-queue-toolkit/core": "workspace:*", + "@message-queue-toolkit/redis-message-deduplication-store": "workspace:*", + "@message-queue-toolkit/s3-payload-store": "workspace:*", "@message-queue-toolkit/sqs": "workspace:*", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts new file mode 100644 index 00000000..6b8c84c7 --- /dev/null +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts @@ -0,0 +1,109 @@ +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' + +import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher.ts' +import type { TestAwsResourceAdmin } from '../utils/testAdmin.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas.ts' + +describe('SnsSqsPermissionConsumer - zstd codec', () => { + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + let publisher: SnsPermissionPublisher + let consumer: SnsSqsPermissionConsumer + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + beforeEach(async () => { + await testAdmin.deleteQueues(SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME) + await testAdmin.deleteTopics(SnsSqsPermissionConsumer.SUBSCRIBED_TOPIC_NAME) + + // No codec option needed — zstd is auto-registered on every consumer. + consumer = new SnsSqsPermissionConsumer(diContainer.cradle) + // skipCompressionBelow: 0 forces compression for these small test messages so the + // suite genuinely exercises the codec path (they are well under the 512 B default). + publisher = new SnsPermissionPublisher(diContainer.cradle, { + codec: 'zstd', + skipCompressionBelow: 0, + }) + + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await publisher.close() + await consumer.close() + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('publishes a compressed SNS message and consumer decompresses it correctly', async () => { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'sns-codec-test-1', + messageType: 'add', + metadata: { info: 'hello sns zstd' }, + } + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }, 15000) + + it('consumer correctly handles multiple compressed SNS messages in sequence', async () => { + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = [ + { id: 'sns-codec-seq-1', messageType: 'add' }, + { id: 'sns-codec-seq-2', messageType: 'add' }, + { id: 'sns-codec-seq-3', messageType: 'add' }, + ] + + for (const msg of messages) { + await publisher.publish(msg) + } + + for (const msg of messages) { + const result = await consumer.handlerSpy.waitForMessageWithId(msg.id, 'consumed') + expect(result.message).toMatchObject(msg) + } + }, 15000) + + it('consumer without codec option auto-detects and decompresses zstd messages from SNS', async () => { + // Stop the beforeEach consumer so it cannot steal messages from the shared queue + await consumer.close() + + // Consumer without explicit codec — decompression is auto-detected from envelope __mqtCodec field + const autoConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + locatorConfig: { + queueUrl: consumer.subscriptionProps.queueUrl, + topicArn: consumer.subscriptionProps.topicArn, + subscriptionArn: consumer.subscriptionProps.subscriptionArn, + }, + }) + await autoConsumer.start() + // Reassign so afterEach closes autoConsumer instead of the already-closed consumer + consumer = autoConsumer + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'sns-codec-auto-1', + messageType: 'add', + } + await publisher.publish(message) + + const result = await autoConsumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }, 15000) +}) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index 68abd72e..b4f967ae 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -41,6 +41,7 @@ type SnsSqsPermissionConsumerOptions = Pick< | 'maxRetryDuration' | 'payloadStoreConfig' | 'concurrentConsumersAmount' + | 'codecs' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -148,6 +149,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< deleteIfExists: false, }, payloadStoreConfig: options.payloadStoreConfig, + codecs: options.codecs, consumerOverrides: options.consumerOverrides ?? { terminateVisibilityTimeout: true, // this allows to retry failed messages immediately }, diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.ts b/packages/sns/test/publishers/SnsPermissionPublisher.ts index e5c2dda7..da9dbe0c 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.ts @@ -26,6 +26,8 @@ export class SnsPermissionPublisher extends AbstractSnsPublisher | 'payloadStoreConfig' | 'messageDeduplicationConfig' | 'enablePublisherDeduplication' + | 'codec' + | 'skipCompressionBelow' >, ) { super(dependencies, { @@ -40,6 +42,8 @@ export class SnsPermissionPublisher extends AbstractSnsPublisher deleteIfExists: false, }, payloadStoreConfig: options?.payloadStoreConfig, + codec: options?.codec, + skipCompressionBelow: options?.skipCompressionBelow, messageSchemas: [PERMISSIONS_ADD_MESSAGE_SCHEMA, PERMISSIONS_REMOVE_MESSAGE_SCHEMA], handlerSpy: true, messageTypeResolver: { messageTypePath: 'messageType' }, diff --git a/packages/sqs/README.md b/packages/sqs/README.md index 686470b6..6cfb7d09 100644 --- a/packages/sqs/README.md +++ b/packages/sqs/README.md @@ -24,6 +24,7 @@ for publishing and consuming messages from both standard and FIFO SQS queues. - [Message Retry Logic](#message-retry-logic) - [Message Deduplication](#message-deduplication) - [Payload Offloading](#payload-offloading) + - [Message Compression](#message-compression) - [Message Handlers](#message-handlers) - [Pre-handlers and Barriers](#pre-handlers-and-barriers) - [Handler Spies](#handler-spies) @@ -62,6 +63,7 @@ npm install @message-queue-toolkit/sqs @message-queue-toolkit/core - ✅ **Handler spies** for testing - ✅ **Pre-handlers and barriers** for complex message processing - ✅ **Automatic queue creation** with validation +- ✅ **Message compression** with zstd via Node.js built-in `zlib` (Node.js >=22.15.0 required) ## Core Concepts @@ -460,6 +462,9 @@ When using `locatorConfig`, you connect to an existing queue without creating it maxPayloadSize: 1024 * 1024, // 1 MiB }, + // Optional - Compression (Node.js >=22.15.0 required) + codec: MessageCodecEnum.ZSTD, // Compress every outgoing message with zstd + // Optional - Deletion deletionConfig: { deleteIfExists: false, // Delete queue on init @@ -538,6 +543,9 @@ When using `locatorConfig`, you connect to an existing queue without creating it payloadStore: s3Store, }, + // Note: consumers have no `codec` option — auto-detection handles built-in zstd. + // Use `codecs: [{ name: 'lz4', handler: new LZ4Handler() }]` only for custom codecs. + // Optional - Other logMessages: false, handlerSpy: true, @@ -788,16 +796,78 @@ await publisher.publish({ }) ``` -**How it works:** +**How it works (without codec):** 1. Publisher checks message size before sending -2. If size exceeds `maxPayloadSize`, stores payload in S3 -3. Replaces payload with pointer: `{ _offloadedPayload: { bucketName, key, size } }` -4. Sends pointer message to SQS -5. Consumer detects pointer, fetches payload from S3 -6. Processes message with full payload +2. If size exceeds `messageSizeThreshold`, serializes and stores payload in S3 +3. Sends a lightweight pointer message to SQS instead +4. Consumer detects the pointer, fetches payload from S3 +5. Processes message with full payload + +**How it works (with codec — compress + offload):** +1. Publisher compresses the serialized message with zstd **once**, up-front +2. If the **compressed** size exceeds `messageSizeThreshold`, stores the compressed bytes in S3 and sends a pointer +3. If the compressed size fits within the threshold, sends the message inline as a codec envelope +4. Consumer fetches the pointer payload as raw bytes, decompresses, then processes as normal + +The codec embedded in `payloadRef.codec` tells the consumer which algorithm to use — no `codec` option is needed on the consumer. **Note:** Payload cleanup is the responsibility of the store (e.g., S3 lifecycle policies). +### Message Compression + +Compress message bodies with zstd using the Node.js built-in `zlib` module. Requires **Node.js >=22.15.0**. + +The codec implementation ships inside `@message-queue-toolkit/core` — no extra package to install. Compression is opt-in: it is only active when you set the `codec` option on a publisher. + +Compressed messages are **self-describing**: the codec is embedded in the message envelope (`{ __mqtCodec: 'zstd', __mqtData: '', ...preserved fields }`), so a consumer without `codec` set will still decompress automatically via envelope detection. + +> **Roll out consumers before publishers.** Auto-detection only works on a consumer running a library version that supports the codec. Upgrade and deploy all consumers of a queue **first** (they keep handling plain messages unchanged), and only then enable `codec` on publishers. A publisher emitting compressed messages to a consumer on an older library version — or to a consumer missing a required custom codec — will fail to process those messages. Such a missing-codec failure is treated as a **retriable** error (a misconfiguration, not a poison message): the message stays on the queue and is retried until the codec is registered, rather than being dropped or sent to the DLQ. This holds for both inline and offloaded compressed messages. + +#### Publisher + +```typescript +import { MessageCodecEnum } from '@message-queue-toolkit/core' + +class MyPublisher extends AbstractSqsPublisher { + constructor(deps: SQSDependencies) { + super(deps, { + codec: MessageCodecEnum.ZSTD, // compress every outgoing message + creationConfig: { queue: { QueueName: 'my-queue' } }, + // ... + }) + } +} +``` + +#### Consumer + +```typescript +class MyConsumer extends AbstractSqsConsumer { + constructor(deps: SQSConsumerDependencies) { + super(deps, { + // No codec option needed for built-in zstd — auto-detection handles it. + // For a custom codec: codecs: [{ name: 'lz4', handler: new LZ4Handler() }] + creationConfig: { queue: { QueueName: 'my-queue' } }, + handlers: new MessageHandlerConfigBuilder() + .addConfig(MySchema, myHandler) + .build(), + }, executionContext) + } +} +``` + +#### Notes + +- Compression is applied **after** schema validation and **before** the SQS `SendMessage` call. +- The message is compressed **exactly once**, regardless of whether payload offloading is also configured. When both features are active: the payload is compressed first, and the decision to offload is made against the **codec envelope wire size** (base64-encoded compressed bytes + JSON framing) rather than the raw or compressed byte count. This means smaller payloads after compression may stay inline and never touch S3. +- The compressed bytes are **never re-compressed** when sent inline — the codec envelope is built directly from the first (and only) compression pass. +- Compressed payloads are still subject to the SQS 256 KB message size limit. Without a payload store, an inline codec envelope that still exceeds 256 KB is rejected by AWS at send time — exactly as an oversized uncompressed message would be. For messages that remain oversized after compression, combine with [Payload Offloading](#payload-offloading). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration. +- On the consumer side, decompression is **buffer-based** (not streamed): an offloaded compressed payload is fetched in full and decompressed in memory before `JSON.parse`. The decompressed size is bounded by `ZstdCodecHandler`'s `maxDecompressedBytes` (default 100 MiB), which also guards against decompression-bomb inputs. +- Uses `MessageCodecEnum.ZSTD` (value `'zstd'`). You can use the string literal or the enum — both satisfy the `MessageCodec` type. +- **`skipCompressionBelow`** (default `512`): minimum UTF-8 byte size a message must reach before compression is applied. Messages strictly below this threshold are sent as plain JSON — small payloads often expand when compressed due to framing overhead. Set to `0` to compress every message regardless of size. Example: `{ codec: MessageCodecEnum.ZSTD, skipCompressionBelow: 1024 }`. +- **Routing/filtering fields are preserved.** The codec envelope carries the message's identity and routing fields (`id`, `timestamp`, `type`, and any deduplication fields) as plaintext siblings of `__mqtData` — the same fields an offloaded-payload pointer preserves. SNS subscription filter policies scoped to `MessageBody` therefore keep working on those fields. A filter policy that references **other** body fields will not match, because the rest of the payload is compressed inside `__mqtData`. +- A consumer that receives an envelope for a codec it has **not** registered (an unregistered custom codec) records it as an error rather than processing the partial envelope — register the codec via the `codecs` option. + ### Message Handlers Handlers process messages based on their type. Messages are routed to the appropriate handler using the discriminator field (configurable via `messageTypeResolver`): diff --git a/packages/sqs/bench/codec.bench.ts b/packages/sqs/bench/codec.bench.ts new file mode 100644 index 00000000..fe987433 --- /dev/null +++ b/packages/sqs/bench/codec.bench.ts @@ -0,0 +1,204 @@ +/** + * Codec integration benchmarks — publish and consume throughput with vs without zstd compression. + * + * Run: pnpm --filter @message-queue-toolkit/sqs bench + * + * Each benchmark pre-fills queues (consume) or sends N messages (publish) and + * measures wall-clock time, reporting msg/s and the overhead percentage. + * All queues are deleted before and after each case. + * + * LIMITATION: N=50 against LocalStack means results are dominated by network + * round-trips (~5–20 ms each), not compression CPU cost (~0.1–1 ms). These + * numbers show end-to-end throughput, not codec overhead. For the codec CPU cost + * in isolation see bench/codecMicro.bench.ts, which is assertable in CI. + * These integration benchmarks print to console only and cannot catch regressions. + */ +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, beforeAll, describe, it } from 'vitest' + +import { SqsPermissionConsumer } from '../test/consumers/SqsPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from '../test/consumers/userConsumerSchemas.ts' +import { SqsPermissionPublisher } from '../test/publishers/SqsPermissionPublisher.ts' +import type { TestAwsResourceAdmin } from '../test/utils/testAdmin.ts' +import type { Dependencies } from '../test/utils/testContext.ts' +import { registerDependencies } from '../test/utils/testContext.ts' + +// ─── Configuration ──────────────────────────────────────────────────────────── + +const N = 50 + +/** Small message with minimal payload (~80 B serialised). */ +const SMALL_META: undefined = undefined + +/** + * Large message with repetitive text (~6 KB serialised). + * Repetitive content compresses very well, showing the realistic best case. + */ +const LARGE_META: Record = { + description: 'The quick brown fox jumps over the lazy dog. '.repeat(60), + items: Array.from({ length: 80 }, (_, i) => ({ + id: `item-${i}`, + value: `value-number-${i}`, + enabled: i % 2 === 0, + })), +} + +const CASES = [ + { label: 'small payload (~80 B) ', suffix: 'sm', meta: SMALL_META }, + { label: 'large payload (~6 KB) ', suffix: 'lg', meta: LARGE_META }, +] as const + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function makeMessages( + prefix: string, + count: number, + meta: Record | undefined, +): PERMISSIONS_ADD_MESSAGE_TYPE[] { + return Array.from({ length: count }, (_, i) => ({ + id: `${prefix}-${i}`, + messageType: 'add' as const, + ...(meta !== undefined ? { metadata: meta } : {}), + })) +} + +function printRow(label: string, count: number, plainMs: number, codecMs: number): void { + const tps = (ms: number) => ((count / ms) * 1000).toFixed(0).padStart(6) + const diff = codecMs - plainMs + const pct = ((diff / plainMs) * 100).toFixed(1) + const sign = diff >= 0 ? '+' : '' + console.log( + ` ${label}` + + ` plain: ${String(plainMs.toFixed(0)).padStart(5)} ms (${tps(plainMs)} msg/s)` + + ` zstd: ${String(codecMs.toFixed(0)).padStart(5)} ms (${tps(codecMs)} msg/s)` + + ` overhead: ${sign}${pct}%`, + ) +} + +// ─── Suite ──────────────────────────────────────────────────────────────────── + +describe('SQS codec benchmarks', () => { + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + // ── Publish ──────────────────────────────────────────────────────────────── + + it(`publish: with vs without zstd (${N} messages)`, async () => { + console.log(`\n${'─'.repeat(72)}`) + console.log(` PUBLISH BENCHMARK — ${N} messages per run`) + console.log('─'.repeat(72)) + + for (const { label, suffix, meta } of CASES) { + const plainQ = `bench-pub-plain-${suffix}` + const codecQ = `bench-pub-codec-${suffix}` + await testAdmin.deleteQueues(plainQ, codecQ) + + const plainMsgs = makeMessages('bpp', N, meta) + const codecMsgs = makeMessages('bcp', N, meta) + + // ── Plain publish ── + const plainPub = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { queue: { QueueName: plainQ } }, + }) + await plainPub.init() + const t0 = performance.now() + for (const msg of plainMsgs) await plainPub.publish(msg) + const plainMs = performance.now() - t0 + await plainPub.close() + + // ── Codec publish ── + const codecPub = new SqsPermissionPublisher(diContainer.cradle, { + codec: 'zstd', + creationConfig: { queue: { QueueName: codecQ } }, + }) + await codecPub.init() + const t1 = performance.now() + for (const msg of codecMsgs) await codecPub.publish(msg) + const codecMs = performance.now() - t1 + await codecPub.close() + + await testAdmin.deleteQueues(plainQ, codecQ) + printRow(label, N, plainMs, codecMs) + } + }, 120_000) + + // ── Consume ──────────────────────────────────────────────────────────────── + + it(`consume: with vs without zstd (${N} messages)`, async () => { + console.log(`\n${'─'.repeat(72)}`) + console.log(` CONSUME BENCHMARK — ${N} messages per run`) + console.log('─'.repeat(72)) + + for (const { label, suffix, meta } of CASES) { + const plainQ = `bench-con-plain-${suffix}` + const codecQ = `bench-con-codec-${suffix}` + await testAdmin.deleteQueues(plainQ, codecQ) + + const plainMsgs = makeMessages('bpc', N, meta) + const codecMsgs = makeMessages('bcc', N, meta) + + // ── Pre-fill plain queue ── + const plainPub = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { queue: { QueueName: plainQ } }, + }) + await plainPub.init() + for (const msg of plainMsgs) await plainPub.publish(msg) + await plainPub.close() + + // ── Pre-fill codec queue ── + const codecPub = new SqsPermissionPublisher(diContainer.cradle, { + codec: 'zstd', + creationConfig: { queue: { QueueName: codecQ } }, + }) + await codecPub.init() + for (const msg of codecMsgs) await codecPub.publish(msg) + await codecPub.close() + + // ── Measure plain consume ── + // deletionConfig: { deleteIfExists: false } preserves the pre-filled queue + const plainCon = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: plainQ } }, + deletionConfig: { deleteIfExists: false }, + }) + await plainCon.start() + const t2 = performance.now() + await Promise.all( + plainMsgs.map((m) => plainCon.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) + const plainMs = performance.now() - t2 + await plainCon.close(true) + + // ── Measure codec consume ── + // No codec option needed — consumers auto-detect envelopes from __mqtCodec. + const codecCon = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: codecQ } }, + deletionConfig: { deleteIfExists: false }, + }) + await codecCon.start() + const t3 = performance.now() + await Promise.all( + codecMsgs.map((m) => codecCon.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) + const codecMs = performance.now() - t3 + await codecCon.close(true) + + await testAdmin.deleteQueues(plainQ, codecQ) + printRow(label, N, plainMs, codecMs) + } + }, 120_000) +}) diff --git a/packages/sqs/bench/codecMicro.bench.ts b/packages/sqs/bench/codecMicro.bench.ts new file mode 100644 index 00000000..f089469c --- /dev/null +++ b/packages/sqs/bench/codecMicro.bench.ts @@ -0,0 +1,83 @@ +/** + * Codec micro-benchmarks — compress/decompress latency in isolation (no network). + * + * Run manually: pnpm --filter @message-queue-toolkit/sqs bench + * + * These cases measure the CPU cost of the codec only, free from LocalStack network + * noise. Each runs ITERATIONS compress+decompress round-trips and asserts the total + * time is below a very conservative ceiling. + * + * This is a manual/local benchmark — it is NOT wired into CI (the `bench` script is + * separate from `test`). Wall-clock assertions are inherently flaky on shared CI + * runners, so the ceilings here are a sanity check for local runs, not a CI gate. + * + * Expected times on typical developer hardware: + * small payload (~100 B) → ~20–100 ms for 100 iterations + * large payload (~6 KB) → ~50–300 ms for 100 iterations + * + * The ceilings below are set at ~10× the expected worst case so that only a + * genuine algorithmic regression (or a severely starved runner) will fail. + */ +import { ZstdCodecHandler } from '@message-queue-toolkit/core' +import { describe, expect, it } from 'vitest' + +const handler = new ZstdCodecHandler() +const ITERATIONS = 100 + +/** Small message — representative of the "skip compression" boundary (~100 B). */ +const SMALL = Buffer.from(JSON.stringify({ id: 'bench-small', messageType: 'add' }), 'utf8') + +/** Large message — repetitive text that compresses very well (~6 KB). */ +const LARGE = Buffer.from( + JSON.stringify({ + id: 'bench-large', + messageType: 'add', + metadata: { + description: 'The quick brown fox jumps over the lazy dog. '.repeat(60), + items: Array.from({ length: 80 }, (_, i) => ({ + id: `item-${i}`, + value: `value-number-${i}`, + enabled: i % 2 === 0, + })), + }, + }), + 'utf8', +) + +describe('ZstdCodecHandler micro-benchmark', () => { + it(`compress+decompress ${ITERATIONS}× small payload (${SMALL.byteLength} B) in under 2 s`, async () => { + const t0 = performance.now() + for (let i = 0; i < ITERATIONS; i++) { + const compressed = await handler.compress(SMALL) + await handler.decompress(compressed) + } + const elapsed = performance.now() - t0 + const perOp = (elapsed / ITERATIONS).toFixed(2) + console.log( + ` small: ${ITERATIONS} round-trips in ${elapsed.toFixed(0)} ms (${perOp} ms/op, ` + + `${((SMALL.byteLength * ITERATIONS) / elapsed / 1000).toFixed(2)} MB/s input)`, + ) + expect( + elapsed, + `${ITERATIONS} round-trips took ${elapsed.toFixed(0)} ms — possible regression`, + ).toBeLessThan(2000) + }) + + it(`compress+decompress ${ITERATIONS}× large payload (${LARGE.byteLength} B) in under 5 s`, async () => { + const t0 = performance.now() + for (let i = 0; i < ITERATIONS; i++) { + const compressed = await handler.compress(LARGE) + await handler.decompress(compressed) + } + const elapsed = performance.now() - t0 + const perOp = (elapsed / ITERATIONS).toFixed(2) + console.log( + ` large: ${ITERATIONS} round-trips in ${elapsed.toFixed(0)} ms (${perOp} ms/op, ` + + `${((LARGE.byteLength * ITERATIONS) / elapsed / 1000).toFixed(2)} MB/s input)`, + ) + expect( + elapsed, + `${ITERATIONS} round-trips took ${elapsed.toFixed(0)} ms — possible regression`, + ).toBeLessThan(5000) + }) +}) diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index d8a06b76..ddf665ff 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -10,8 +10,13 @@ import { type BarrierResult, type DeadLetterQueueOptions, DeduplicationRequesterEnum, + getCodecName, HandlerContainer, + hasCodecEnvelopeShape, isMessageError, + KNOWN_CODECS, + type MessageCodecHandler, + type MessageCodecRegistration, type MessageSchemaContainer, noopReleasableLock, type ParseMessageResult, @@ -21,6 +26,7 @@ import { type QueueConsumer, type QueueConsumerDependencies, type QueueConsumerOptions, + resolveCodecHandler, type TransactionObservabilityManager, } from '@message-queue-toolkit/core' import type { ConsumerOptions } from 'sqs-consumer' @@ -85,6 +91,39 @@ type SQSConsumerCommonOptions< SQSCreationConfig, SQSQueueLocatorType > & { + /** + * Additional codecs to register on this consumer. + * Built-in codecs (e.g. `MessageCodecEnum.ZSTD`) are always registered automatically. + * Any incoming message whose `__mqtCodec` name is not in the registry causes an error. + * + * Use this to support custom codecs published by a corresponding publisher: + * @example + * const codec = { name: 'lz4', handler: new MyLz4Handler() } + * new MyConsumer(deps, { codecs: [codec] }) + */ + codecs?: MessageCodecRegistration[] + /** + * Disables automatic codec-envelope detection on this consumer. + * + * By default, consumers inspect every incoming message body with `hasCodecEnvelopeShape`. + * If the body carries a `__mqtCodec` name plus a base64 `__mqtData` field, it is treated + * as compressed and decompressed before schema validation. Extra sibling fields are + * allowed (publishers copy `id`/`type`/… alongside the codec fields for broker-side + * filtering), so detection is presence-based, not an exact-shape match. + * + * Set this to `true` only if your message schema legitimately contains `__mqtCodec` and + * `__mqtData` fields and you do not want auto-detection to intercept them. + * + * **Warning:** with auto-detection disabled, a genuinely compressed message reaching + * this consumer is *not* decompressed — its raw envelope is handed straight to schema + * validation. Because the envelope carries preserved `id`/`type` siblings, a lenient + * (non-`.strict()`) schema may accept it as a valid-but-incomplete message instead of + * failing loudly. Only enable this on queues you are certain never receive compressed + * messages. + * + * @default false + */ + disableCodecAutoDetection?: boolean /** * Wait time in seconds the consumer passes to SQS ReceiveMessage (long-polling). * AWS allows integer values 0–20; anything else throws a RangeError at @@ -225,6 +264,8 @@ export abstract class AbstractSqsConsumer< private readonly barrierVisibilityExtensionIntervalInMsecs: number private readonly barrierVisibilityTimeoutInSeconds: number private readonly consumerPollingWaitTimeSeconds: number + /** Registry of codec name → handler. Seeded from all built-in codecs + options.codecs. */ + private readonly codecRegistry: ReadonlyMap protected deadLetterQueueUrl?: string protected readonly errorResolver: ErrorResolver @@ -276,6 +317,15 @@ export abstract class AbstractSqsConsumer< messageHandlers: options.handlers, }) this.isDeduplicationEnabled = !!options.enableConsumerDeduplication + // Build codec registry: always seed with all built-in codecs, then add user-supplied ones. + const registry = new Map() + for (const builtInName of KNOWN_CODECS) { + registry.set(builtInName, resolveCodecHandler(builtInName as MessageCodecRegistration)) + } + for (const registration of options.codecs ?? []) { + registry.set(getCodecName(registration), resolveCodecHandler(registration)) + } + this.codecRegistry = registry } override async init(): Promise { @@ -363,7 +413,21 @@ export abstract class AbstractSqsConsumer< const messageProcessingStartTimestamp = Date.now() - const deserializedMessage = await this.deserializeMessage(message) + let deserializedMessage: Either<'abort', ParseMessageResult> + try { + deserializedMessage = await this.deserializeMessage(message) + } catch (err) { + // A throw out of deserialization — e.g. a message envelope (inline body or an + // offloaded-payload pointer) naming a codec this consumer has not registered — + // is terminal for this consumer. Treat it as an invalid message: the abort + // branch below routes it to the DLQ if one is configured. Retrying would be + // pointless — a missing-codec deployment misconfiguration is not fixed within + // the redelivery window, so the message would only burn its receive-count + // attempts (or, with no DLQ, spam the error reporter for the whole retention + // period) before ending up in the DLQ anyway. + this.handleError(err) + deserializedMessage = ABORT_EARLY_EITHER + } if (deserializedMessage.error === 'abort') { await this.failProcessing(message) @@ -915,19 +979,59 @@ export abstract class AbstractSqsConsumer< } // Empty content for whatever reason - if (!resolveMessageResult.result || !resolveMessageResult.result.body) { + if (!resolveMessageResult.result?.body) { return ABORT_EARLY_EITHER } if (hasOffloadedPayload(resolveMessageResult.result)) { const retrieveOffloadedMessagePayloadResult = await this.retrieveOffloadedMessagePayload( resolveMessageResult.result.body, + (codecName) => { + const handler = this.codecRegistry.get(codecName) + if (!handler) { + // The pointer names a codec this consumer has not registered. Throwing here + // (outside retrieveOffloadedMessagePayload's catch) propagates to handleMessage, + // which treats it as an invalid message and routes it to the DLQ (if one is + // configured) so it can be redriven once the codec is deployed. + throw new Error( + `No codec handler registered for "${codecName}". Register it via the consumer's \`codecs\` option.`, + ) + } + return (data) => handler.decompress(data) + }, ) if (retrieveOffloadedMessagePayloadResult.error) { this.handleError(retrieveOffloadedMessagePayloadResult.error) return ABORT_EARLY_EITHER } resolveMessageResult.result.body = retrieveOffloadedMessagePayloadResult.result + } else if ( + !this.disableCodecAutoDetection && + hasCodecEnvelopeShape(resolveMessageResult.result.body) + ) { + const envelope = resolveMessageResult.result.body + const handler = this.codecRegistry.get(envelope.__mqtCodec) + if (!handler) { + // Envelope-shaped body naming a codec this consumer has not registered. Throwing + // here propagates to handleMessage, which treats it as an invalid message and + // routes it to the DLQ (if one is configured) so it can be redriven once the codec + // is deployed. Throwing — rather than falling through — is essential: otherwise the + // envelope's preserved sibling fields (id, type, …) could satisfy a lenient schema + // and be processed as an incomplete message. Register the codec via the `codecs` + // option. + throw new Error( + `Received a message compressed with codec "${envelope.__mqtCodec}", which is not registered on this consumer. Register it via the \`codecs\` option.`, + ) + } + try { + const compressed = Buffer.from(envelope.__mqtData, 'base64') + resolveMessageResult.result.body = JSON.parse( + (await handler.decompress(compressed)).toString('utf8'), + ) + } catch (err) { + this.handleError(err as Error) + return ABORT_EARLY_EITHER + } } return resolveMessageResult @@ -942,8 +1046,13 @@ export abstract class AbstractSqsConsumer< const resolvedMessage = resolveMessageResult.result // Empty content for whatever reason - if (!resolvedMessage || !resolvedMessage.body) return ABORT_EARLY_EITHER + if (!resolvedMessage?.body) return ABORT_EARLY_EITHER + // Best-effort id extraction for logging/metrics only — this runs on the raw (still + // possibly compressed) body. A codec envelope from a built-in publisher carries the + // id as a preserved plaintext sibling, so it is found here; a bare envelope (e.g. from + // the `compressMessageBody` helper) has no id and falls through to abort, which only + // means the id is absent from a log line — message processing is unaffected. // @ts-expect-error if (this.messageIdField in resolvedMessage.body) { return { diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index f4ed02d4..34ea0ed9 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -117,18 +117,34 @@ export abstract class AbstractSqsPublisher const messageProcessingStartTimestamp = Date.now() const parsedMessage = messageSchemaResult.result.parse(message) + // Fast read-only pre-check: skip compression/offload for messages already known to + // be duplicates. This does NOT persist a dedup key, so a transient failure in the + // expensive work below leaves no key behind and the publish stays safely retriable. + if (await this.isMessageDuplicated(parsedMessage, DeduplicationRequesterEnum.Publisher)) { + this.handleMessageProcessed({ + message: parsedMessage, + processingResult: { status: 'published', skippedAsDuplicate: true }, + messageProcessingStartTimestamp, + queueName: this.queueName, + }) + return + } + message = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading // (offloaded payload won't have user fields needed for messageGroupIdField) const resolvedOptions = this.resolveFifoOptions(message, options) - const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => - calculateOutgoingMessageSize(message), - ) + const { payload, preBuiltBody } = await this.prepareOutgoingPayload(message) + // Persist the dedup key only now — immediately before send — so the window in which + // a stored key plus a failed send could drop the message on retry stays as small as + // possible and no longer spans compression/offload (a transient failure there leaves + // no key behind). The pre-check above already skipped the expensive work for the + // common duplicate case; this check additionally closes the race where a concurrent + // publish stored the key in the meantime. if ( - this.isDeduplicationEnabledForMessage(parsedMessage) && (await this.deduplicateMessage(parsedMessage, DeduplicationRequesterEnum.Publisher)) .isDuplicated ) { @@ -141,7 +157,7 @@ export abstract class AbstractSqsPublisher return } - await this.sendMessage(maybeOffloadedPayloadMessage, resolvedOptions) + await this.sendMessage(payload, resolvedOptions, preBuiltBody) this.handleMessageProcessed({ message: parsedMessage, processingResult: { status: 'published' }, @@ -199,16 +215,22 @@ export abstract class AbstractSqsPublisher return this.messageSchemaContainer.resolveSchema(message) } + protected override calculateOutgoingMessageSize(message: MessagePayloadType): number { + return calculateOutgoingMessageSize(message) + } + protected async sendMessage( payload: MessagePayloadType | OffloadedPayloadPointerPayload, options: SQSMessageOptions, + preBuiltBody?: string, ): Promise { const attributes = resolveOutgoingMessageAttributes(payload) - - // Options are already resolved in publish() before offloading + // preBuiltBody is set when codec is active and the payload was not offloaded — + // it contains the already-compressed codec envelope, so we skip re-serialization. + const body = preBuiltBody ?? JSON.stringify(payload) const command = new SendMessageCommand({ QueueUrl: this.queueUrl, - MessageBody: JSON.stringify(payload), + MessageBody: body, MessageAttributes: attributes, ...options, }) diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 31231a1b..e8cb8200 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sqs", - "version": "24.3.0", + "version": "25.0.0", "private": false, "license": "MIT", "description": "SQS adapter for message-queue-toolkit", @@ -22,6 +22,7 @@ "scripts": { "build": "pnpm run clean && tsc --project tsconfig.build.json", "clean": "rimraf dist", + "bench": "vitest run --config vitest.bench.config.ts --reporter=verbose", "test": "vitest", "test:coverage": "pnpm run test --coverage", "lint": "biome check && tsc", @@ -36,7 +37,7 @@ }, "peerDependencies": { "@aws-sdk/client-sqs": "^3.1034.0", - "@message-queue-toolkit/core": ">=25.0.0", + "@message-queue-toolkit/core": ">=26.0.0", "zod": ">=3.25.76 <5.0.0" }, "devDependencies": { @@ -45,10 +46,10 @@ "@biomejs/biome": "^2.3.8", "@lokalise/biome-config": "^3.1.0", "@lokalise/tsconfig": "^3.0.0", - "@message-queue-toolkit/core": "*", - "@message-queue-toolkit/redis-message-deduplication-store": "*", - "@message-queue-toolkit/s3-payload-store": "*", - "@message-queue-toolkit/schemas": "*", + "@message-queue-toolkit/core": "workspace:*", + "@message-queue-toolkit/redis-message-deduplication-store": "workspace:*", + "@message-queue-toolkit/s3-payload-store": "workspace:*", + "@message-queue-toolkit/schemas": "workspace:*", "@types/node": "^25.0.2", "@vitest/coverage-v8": "^4.0.15", "awilix": "^13.0.3", diff --git a/packages/sqs/test/codec/codecHandler.spec.ts b/packages/sqs/test/codec/codecHandler.spec.ts new file mode 100644 index 00000000..abdec231 --- /dev/null +++ b/packages/sqs/test/codec/codecHandler.spec.ts @@ -0,0 +1,35 @@ +import { getCodecName, MessageCodecEnum } from '@message-queue-toolkit/core' +import { describe, expect, it } from 'vitest' + +describe('getCodecName', () => { + it('returns the string as-is for built-in codec enum values', () => { + expect(getCodecName(MessageCodecEnum.ZSTD)).toBe('zstd') + }) + + it('returns the name for a valid custom codec registration', () => { + expect(getCodecName({ name: 'lz4', handler: {} as any })).toBe('lz4') + expect(getCodecName({ name: 'my-codec_v2', handler: {} as any })).toBe('my-codec_v2') + }) + + it('throws for a custom codec name containing a double-quote', () => { + expect(() => getCodecName({ name: 'lz4"x', handler: {} as any })).toThrow( + 'Invalid codec name "lz4"x"', + ) + }) + + it('throws for a custom codec name containing a backslash', () => { + expect(() => getCodecName({ name: 'lz4\\x', handler: {} as any })).toThrow( + 'Invalid codec name "lz4\\x"', + ) + }) + + it('throws for a custom codec name containing whitespace', () => { + expect(() => getCodecName({ name: 'my codec', handler: {} as any })).toThrow( + 'Invalid codec name "my codec"', + ) + }) + + it('throws for an empty custom codec name', () => { + expect(() => getCodecName({ name: '', handler: {} as any })).toThrow('Invalid codec name ""') + }) +}) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts new file mode 100644 index 00000000..56b428f6 --- /dev/null +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts @@ -0,0 +1,680 @@ +import type { Transform } from 'node:stream' +import { PassThrough } from 'node:stream' +import { ReceiveMessageCommand, SendMessageCommand } from '@aws-sdk/client-sqs' +import { waitAndRetry } from '@lokalise/node-core' +import type { MessageCodecHandler } from '@message-queue-toolkit/core' +import { + buildCodecEnvelope, + MessageCodecEnum, + resolveCodecHandler, +} from '@message-queue-toolkit/core' +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' + +import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher.ts' +import type { TestAwsResourceAdmin } from '../utils/testAdmin.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { SqsPermissionConsumer } from './SqsPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas.ts' + +// Padding that pushes any test message's JSON representation comfortably above the +// default skipCompressionBelow threshold (512 bytes), ensuring compression is actually +// applied. Sized with generous margin so unrelated schema tweaks cannot drop a test +// message back under the threshold. +const LARGE_PADDING = 'x'.repeat(800) + +/** + * Builds a raw zstd codec envelope, simulating an external (non-mqt) producer that + * compressed the message itself. Deliberately omits the preserved sibling fields the + * built-in publisher adds — these tests exercise the bare-envelope path. + */ +async function compressToZstdEnvelope(message: unknown): Promise { + const handler = resolveCodecHandler(MessageCodecEnum.ZSTD) + const compressed = await handler.compress(Buffer.from(JSON.stringify(message), 'utf8')) + return buildCodecEnvelope(compressed, MessageCodecEnum.ZSTD) +} + +describe('SqsPermissionConsumer - zstd codec', () => { + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + let publisher: SqsPermissionPublisher + let consumer: SqsPermissionConsumer + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + beforeEach(async () => { + await testAdmin.deleteQueues(SqsPermissionConsumer.QUEUE_NAME) + + consumer = new SqsPermissionConsumer(diContainer.cradle, { + deletionConfig: { deleteIfExists: false }, + }) + publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + }) + + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await publisher.close() + await consumer.close(true) + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('publishes a compressed message and consumer decompresses it correctly', async () => { + // Message is padded to exceed the default skipCompressionBelow (512 bytes) so that + // compression actually fires. Without the padding the payload would be sent as plain + // JSON and the wire assertion below would fail. + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-test-1', + messageType: 'add', + metadata: { info: 'hello zstd', padding: LARGE_PADDING }, + } + + // Wire assertion: verify the message is actually sent as a codec envelope. + // Uses an isolated queue with no consumer to avoid a race on the raw SQS body. + const wireQueueName = `${SqsPermissionConsumer.QUEUE_NAME}-roundtrip-wire` + await testAdmin.deleteQueues(wireQueueName) + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: wireQueueName } }, + }) + await wirePublisher.init() + await wirePublisher.publish(message) + + const { Messages: wireMessages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + const envelope = JSON.parse(wireMessages![0]!.Body!) as Record + expect(envelope.__mqtCodec).toBe(MessageCodecEnum.ZSTD) + expect(typeof envelope.__mqtData).toBe('string') + const compressedBytes = Buffer.from(envelope.__mqtData as string, 'base64') + expect(compressedBytes.subarray(0, 4)).toEqual(Buffer.from([0x28, 0xb5, 0x2f, 0xfd])) + await wirePublisher.close() + + // Round-trip assertion: consumer receives and decompresses the message correctly. + await publisher.publish(message) + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }) + + it('published SQS message body is a codec envelope containing valid zstd bytes', async () => { + // Use an isolated queue with no consumer so we can read the raw message without a race + const wireQueueName = `${SqsPermissionConsumer.QUEUE_NAME}-wire-check` + await testAdmin.deleteQueues(wireQueueName) + + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: wireQueueName } }, + }) + await wirePublisher.init() + + // Message must exceed the default skipCompressionBelow (512 bytes) for compression to fire. + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-wire-1', + messageType: 'add', + metadata: { padding: LARGE_PADDING }, + } + await wirePublisher.publish(message) + + // Read the raw message directly from SQS — no consumer is running on this queue + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages, 'Expected a message to be in the queue').toBeDefined() + expect(Messages!.length).toBe(1) + + // Body must be a self-describing codec envelope, not raw message JSON + const envelope = JSON.parse(Messages![0]!.Body!) as Record + expect(envelope.__mqtCodec).toBe(MessageCodecEnum.ZSTD) + expect(typeof envelope.__mqtData).toBe('string') + + // __mqtData must decode to a valid zstd frame: magic number 0xFD2FB528 (LE → 28 B5 2F FD) + const compressed = Buffer.from(envelope.__mqtData as string, 'base64') + expect(compressed.subarray(0, 4)).toEqual(Buffer.from([0x28, 0xb5, 0x2f, 0xfd])) + + await wirePublisher.close() + }) + + it('consumer correctly handles multiple compressed messages in sequence', async () => { + // Messages are padded to exceed the default skipCompressionBelow (512 bytes). + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = [ + { id: 'codec-seq-1', messageType: 'add', metadata: { padding: LARGE_PADDING } }, + { id: 'codec-seq-2', messageType: 'add', metadata: { padding: LARGE_PADDING } }, + { id: 'codec-seq-3', messageType: 'add', metadata: { padding: LARGE_PADDING } }, + ] + + // Wire assertion: verify each message is actually compressed on the wire. + // Uses an isolated queue with no consumer to avoid a race on the raw SQS body. + const wireQueueName = `${SqsPermissionConsumer.QUEUE_NAME}-seq-wire` + await testAdmin.deleteQueues(wireQueueName) + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: wireQueueName } }, + }) + await wirePublisher.init() + + for (const msg of messages) { + await wirePublisher.publish(msg) + } + + const { Messages: wireMessages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 10, + WaitTimeSeconds: 5, + }), + ) + expect(wireMessages).toHaveLength(messages.length) + for (const raw of wireMessages!) { + const envelope = JSON.parse(raw.Body!) as Record + expect(envelope.__mqtCodec).toBe(MessageCodecEnum.ZSTD) + expect(typeof envelope.__mqtData).toBe('string') + const compressedBytes = Buffer.from(envelope.__mqtData as string, 'base64') + expect(compressedBytes.subarray(0, 4)).toEqual(Buffer.from([0x28, 0xb5, 0x2f, 0xfd])) + } + await wirePublisher.close() + + // Round-trip assertion: consumer receives and decompresses all messages correctly. + for (const msg of messages) { + await publisher.publish(msg) + } + + for (const msg of messages) { + const result = await consumer.handlerSpy.waitForMessageWithId(msg.id, 'consumed') + expect(result.message).toMatchObject(msg) + } + }) + + it('consumer decompresses a message compressed externally with zstd', async () => { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-external-1', + messageType: 'add', + metadata: { source: 'external-compressor' }, + } + + // Simulate a publisher that compressed the message itself + const compressedBody = await compressToZstdEnvelope(message) + await diContainer.cradle.sqsClient.send( + new SendMessageCommand({ + QueueUrl: consumer.queueProps.url, + MessageBody: compressedBody, + }), + ) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + }) + + it('consumer without codec option still decompresses zstd messages (auto-detection)', async () => { + // Message is padded to exceed the default skipCompressionBelow (512 bytes) so that + // the publisher actually compresses it. Without padding the message would be sent as + // plain JSON and the auto-detect consumer would succeed trivially via normal parsing — + // which would not prove decompression is working. + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-auto-detect-1', + messageType: 'add', + metadata: { padding: LARGE_PADDING }, + } + + // Wire assertion: verify the publisher actually sends a codec envelope. + // The auto-detect consumer would succeed on plain JSON too, so we need this + // to prove decompression is actually happening rather than plain JSON parsing. + const wireQueueName = `${SqsPermissionConsumer.QUEUE_NAME}-auto-detect-wire` + await testAdmin.deleteQueues(wireQueueName) + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: wireQueueName } }, + }) + await wirePublisher.init() + await wirePublisher.publish(message) + + const { Messages: wireMessages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + const envelope = JSON.parse(wireMessages![0]!.Body!) as Record + expect(envelope.__mqtCodec).toBe(MessageCodecEnum.ZSTD) + expect(typeof envelope.__mqtData).toBe('string') + await wirePublisher.close() + + // Round-trip assertion: consumer WITHOUT codec auto-detects the envelope and decompresses. + // Use a dedicated queue so only autoConsumer polls it — avoids both the race + // condition (shared queue) and localstack long-poll timing issues (abort + restart). + const autoQueueName = `${SqsPermissionConsumer.QUEUE_NAME}-auto-detect` + await testAdmin.deleteQueues(autoQueueName) + + const autoPublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: autoQueueName } }, + }) + await autoPublisher.init() + + // Consumer without codec — auto-detects from envelope __mqtCodec field + const autoConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: autoQueueName } }, + deletionConfig: { deleteIfExists: false }, + }) + await autoConsumer.start() + + await autoPublisher.publish(message) + + const result = await autoConsumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await autoPublisher.close() + await autoConsumer.close(true) + }, 15000) +}) + +describe('SqsPermissionConsumer - skipCompressionBelow', () => { + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('sends plain JSON for small messages by default (no skipCompressionBelow set)', async () => { + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-default-skip` + await testAdmin.deleteQueues(queueName) + + // No skipCompressionBelow — default of 512 applies. + // The small message (well under 512 bytes) must be sent as plain JSON. + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + creationConfig: { queue: { QueueName: queueName } }, + }) + await wirePublisher.init() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'default-skip-1', + messageType: 'add', + } + await wirePublisher.publish(message) + + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages, 'Expected a message to be in the queue').toBeDefined() + expect(Messages!.length).toBe(1) + + const body = JSON.parse(Messages![0]!.Body!) as Record + expect(body.__mqtCodec).toBeUndefined() + expect(body.__mqtData).toBeUndefined() + expect(body.id).toBe(message.id) + + await wirePublisher.close() + }) + + it('sends plain JSON when message is smaller than skipCompressionBelow', async () => { + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-skip-below` + await testAdmin.deleteQueues(queueName) + + // skipCompressionBelow set very high — small message is never compressed + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + skipCompressionBelow: 99_999, + creationConfig: { queue: { QueueName: queueName } }, + }) + await wirePublisher.init() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'skip-below-1', + messageType: 'add', + } + await wirePublisher.publish(message) + + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages, 'Expected a message to be in the queue').toBeDefined() + expect(Messages!.length).toBe(1) + + const body = JSON.parse(Messages![0]!.Body!) as Record + expect(body.__mqtCodec).toBeUndefined() + expect(body.__mqtData).toBeUndefined() + expect(body.id).toBe(message.id) + + await wirePublisher.close() + }) + + it('compresses when skipCompressionBelow is 0 (always compress)', async () => { + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-always-compress` + await testAdmin.deleteQueues(queueName) + + // skipCompressionBelow: 0 disables the floor — every message is compressed + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + skipCompressionBelow: 0, + creationConfig: { queue: { QueueName: queueName } }, + }) + await wirePublisher.init() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'always-compress-1', + messageType: 'add', + } + await wirePublisher.publish(message) + + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages, 'Expected a message to be in the queue').toBeDefined() + expect(Messages!.length).toBe(1) + + const envelope = JSON.parse(Messages![0]!.Body!) as Record + expect(envelope.__mqtCodec).toBe(MessageCodecEnum.ZSTD) + expect(typeof envelope.__mqtData).toBe('string') + + await wirePublisher.close() + }) + + it('consumer receives and processes a message sent as plain JSON due to skipCompressionBelow', async () => { + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-skip-consumer` + await testAdmin.deleteQueues(queueName) + + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + skipCompressionBelow: 99_999, + creationConfig: { queue: { QueueName: queueName } }, + }) + await wirePublisher.init() + + const wireConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + }) + await wireConsumer.start() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'skip-consumer-1', + messageType: 'add', + metadata: { info: 'plain json path' }, + } + await wirePublisher.publish(message) + + const result = await wireConsumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await wirePublisher.close() + await wireConsumer.close(true) + }) + + it('consumer with disableCodecAutoDetection passes codec-shaped message through as plain JSON', async () => { + // A publisher with skipCompressionBelow set very high sends plain JSON even though + // codec is configured — the body will NOT be a codec envelope, so this test + // verifies that disableCodecAutoDetection does not break the plain-JSON path. + // More importantly: if the message body were a real envelope, the consumer would + // normally auto-detect and decompress it; with the flag set it must not do so. + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-disable-auto-detect` + await testAdmin.deleteQueues(queueName) + + // Publisher sends plain JSON (skipCompressionBelow prevents compression) + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + skipCompressionBelow: 99_999, + creationConfig: { queue: { QueueName: queueName } }, + }) + await wirePublisher.init() + + // Consumer opts out of auto-detection + const noAutoConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + disableCodecAutoDetection: true, + }) + await noAutoConsumer.start() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'disable-auto-detect-1', + messageType: 'add', + metadata: { info: 'plain json, no auto-detect' }, + } + await wirePublisher.publish(message) + + const result = await noAutoConsumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await wirePublisher.close() + await noAutoConsumer.close(true) + }) + + it('consumer with disableCodecAutoDetection:true does not decompress a real zstd envelope', async () => { + // When disableCodecAutoDetection is true, a real codec envelope reaching the consumer + // must NOT be decompressed — the raw envelope object is passed to schema validation, + // which fails because { __mqtCodec, __mqtData } has no `messageType` field. + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-disable-real-envelope` + await testAdmin.deleteQueues(queueName) + + const noAutoConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + disableCodecAutoDetection: true, + }) + await noAutoConsumer.start() + + // Build a real zstd envelope and inject it directly into the queue + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'disable-real-envelope-1', + messageType: 'add', + } + const compressedBody = await compressToZstdEnvelope(message) + await diContainer.cradle.sqsClient.send( + new SendMessageCommand({ + QueueUrl: noAutoConsumer.queueProps.url, + MessageBody: compressedBody, + }), + ) + + // The envelope fails schema validation — consumer records it as an error, not consumed. + await waitAndRetry(() => noAutoConsumer.handlerSpy.counts.error > 0, 100, 20) + expect(noAutoConsumer.handlerSpy.counts.error).toBeGreaterThan(0) + expect(noAutoConsumer.addCounter).toBe(0) + + await noAutoConsumer.close(true) + }, 15000) +}) + +// --------------------------------------------------------------------------- +// Custom codec — passthrough handler registered via { name, handler } form +// --------------------------------------------------------------------------- + +/** + * Identity codec: compress/decompress are no-ops, createCompressStream returns a PassThrough. + * Lets us verify the full custom-codec path without a real compression library in tests. + */ +class NoopCodecHandler implements MessageCodecHandler { + compress(data: Buffer): Promise { + return Promise.resolve(data) + } + decompress(data: Buffer): Promise { + return Promise.resolve(data) + } + createCompressStream(): Transform { + return new PassThrough() + } +} + +describe('SqsPermissionConsumer - custom codec registration', () => { + const CUSTOM_CODEC_NAME = 'noop' + + let diContainer: AwilixContainer + let testAdmin: TestAwsResourceAdmin + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + testAdmin = diContainer.cradle.testAdmin + }) + + afterAll(async () => { + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('publisher wraps payload in an envelope with the custom codec name', async () => { + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-custom-codec-wire` + await testAdmin.deleteQueues(queueName) + + const codec = { name: CUSTOM_CODEC_NAME, handler: new NoopCodecHandler() } + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec, + skipCompressionBelow: 0, // always compress so the envelope is always produced + creationConfig: { queue: { QueueName: queueName } }, + }) + await wirePublisher.init() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'custom-codec-wire-1', + messageType: 'add', + } + await wirePublisher.publish(message) + + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages).toBeDefined() + expect(Messages!.length).toBe(1) + + const envelope = JSON.parse(Messages![0]!.Body!) as Record + // Envelope must carry the user-supplied codec name, not 'zstd'. + expect(envelope.__mqtCodec).toBe(CUSTOM_CODEC_NAME) + expect(typeof envelope.__mqtData).toBe('string') + // Since the handler is a no-op, __mqtData decodes to the original JSON. + const decoded = Buffer.from(envelope.__mqtData as string, 'base64').toString('utf8') + expect(JSON.parse(decoded)).toMatchObject(message) + + await wirePublisher.close() + }) + + it('consumer configured with the same custom codec decompresses and processes the message', async () => { + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-custom-codec-roundtrip` + await testAdmin.deleteQueues(queueName) + + const codec = { name: CUSTOM_CODEC_NAME, handler: new NoopCodecHandler() } + + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec, + skipCompressionBelow: 0, + creationConfig: { queue: { QueueName: queueName } }, + }) + await publisher.init() + + const consumer = new SqsPermissionConsumer(diContainer.cradle, { + codecs: [codec], + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + }) + await consumer.start() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'custom-codec-roundtrip-1', + messageType: 'add', + metadata: { info: 'custom codec round-trip' }, + } + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await publisher.close() + await consumer.close(true) + }) + + it('consumer without the custom codec rejects envelopes with that codec name', async () => { + // A consumer without extra codecs registers only the built-in codecs (e.g. zstd), not + // 'noop'. An envelope-shaped body naming an unregistered codec is a misconfiguration: + // the consumer surfaces it as an error rather than letting the envelope's preserved + // sibling fields (id, messageType) satisfy the schema and be processed as an + // incomplete message. So an error is recorded and the message is never handled — + // addCounter stays at 0. + const queueName = `${SqsPermissionConsumer.QUEUE_NAME}-custom-codec-no-autodetect` + await testAdmin.deleteQueues(queueName) + + const codec = { name: CUSTOM_CODEC_NAME, handler: new NoopCodecHandler() } + + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec, + skipCompressionBelow: 0, + creationConfig: { queue: { QueueName: queueName } }, + }) + await publisher.init() + + // Consumer with no extra codecs — only built-in codecs (e.g. zstd) are registered, not 'noop' + const zstdConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + }) + await zstdConsumer.start() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'custom-codec-no-autodetect-1', + messageType: 'add', + } + await publisher.publish(message) + + // The unregistered-codec envelope is rejected as an error; wait for that error to be + // recorded, then confirm the handler never ran (no incomplete message was processed). + await waitAndRetry(() => zstdConsumer.handlerSpy.counts.error > 0, 100, 20) + expect(zstdConsumer.handlerSpy.counts.error).toBeGreaterThan(0) + expect(zstdConsumer.addCounter).toBe(0) + + await publisher.close() + await zstdConsumer.close(true) + }, 10000) +}) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts index c53ff481..15467c92 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts @@ -1,7 +1,7 @@ import type { S3 } from '@aws-sdk/client-s3' -import { SendMessageCommand } from '@aws-sdk/client-sqs' +import { ReceiveMessageCommand, SendMessageCommand } from '@aws-sdk/client-sqs' import type { SinglePayloadStoreConfig } from '@message-queue-toolkit/core' -import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' +import { MessageCodecEnum, MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' import { S3PayloadStore } from '@message-queue-toolkit/s3-payload-store' import { OFFLOADED_PAYLOAD_SIZE_ATTRIBUTE } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' @@ -13,7 +13,7 @@ import { AbstractSqsConsumer } from '../../lib/sqs/AbstractSqsConsumer.ts' import { AbstractSqsPublisher } from '../../lib/sqs/AbstractSqsPublisher.ts' import { SQS_MESSAGE_MAX_SIZE } from '../../lib/sqs/AbstractSqsService.ts' import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher.ts' -import { putObjectContent, waitForS3Objects } from '../utils/s3Utils.ts' +import { getObjectBuffer, putObjectContent, waitForS3Objects } from '../utils/s3Utils.ts' import type { TestAwsResourceAdmin } from '../utils/testAdmin.ts' import type { Dependencies } from '../utils/testContext.ts' import { registerDependencies } from '../utils/testContext.ts' @@ -401,3 +401,163 @@ describe('SqsPermissionConsumer - nested messageTypePath with payload offloading }) }) }) + +describe('SqsPermissionConsumer - codec + payload offloading', () => { + const s3BucketName = 'test-bucket-codec' + // Threshold low enough that even a small compressed payload triggers offloading + const smallThreshold = 10 + + let diContainer: AwilixContainer + let s3: S3 + let testAdmin: TestAwsResourceAdmin + let payloadStoreConfig: SinglePayloadStoreConfig + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + s3 = diContainer.cradle.s3 + testAdmin = diContainer.cradle.testAdmin + + await testAdmin.createBucket(s3BucketName) + payloadStoreConfig = { + messageSizeThreshold: smallThreshold, + store: new S3PayloadStore(diContainer.cradle, { bucketName: s3BucketName }), + storeName: 's3', + } + }) + + afterAll(async () => { + await testAdmin.emptyBuckets(s3BucketName) + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('S3 object is raw zstd binary and SQS message carries a plain pointer (not a codec envelope)', async () => { + // Use an isolated queue with no consumer so we can read the raw SQS message without a race + const wireQueueName = 'codec-offload-wire-check' + await testAdmin.deleteQueues(wireQueueName) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-offload-wire-1', + messageType: 'add', + metadata: { info: 'wire format check' }, + } + + const wirePublisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + skipCompressionBelow: 0, // always compress: this suite exercises the codec + offload path + payloadStoreConfig, + creationConfig: { queue: { QueueName: wireQueueName } }, + }) + await wirePublisher.init() + await wirePublisher.publish(message) + + // Read the raw SQS message before any consumer touches it + const { Messages } = await diContainer.cradle.sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: wirePublisher.queueProps.url, + MaxNumberOfMessages: 1, + WaitTimeSeconds: 5, + }), + ) + expect(Messages, 'Expected a message to be in the queue').toBeDefined() + expect(Messages!.length).toBe(1) + + // SQS body must be a plain JSON pointer — not a codec envelope. + // Compressed bytes live in S3; only the pointer is sent inline. + const sqsBody = JSON.parse(Messages![0]!.Body!) as Record + expect( + sqsBody.__mqtCodec, + 'SQS body must not be a codec envelope when offloading', + ).toBeUndefined() + expect(sqsBody.payloadRef, 'SQS body must contain a payloadRef pointer').toBeDefined() + const payloadRef = sqsBody.payloadRef as Record + expect(payloadRef.codec).toBe(MessageCodecEnum.ZSTD) + + // S3 object must be raw compressed binary, not a JSON codec envelope. + // zstd frames start with magic number 0xFD2FB528 (little-endian: 28 B5 2F FD). + const s3Keys = await waitForS3Objects(s3, s3BucketName, 1, 5000) + expect(s3Keys.length).toBeGreaterThan(0) + const s3Bytes = await getObjectBuffer(s3, s3BucketName, s3Keys[0]!) + expect(s3Bytes.subarray(0, 4)).toEqual(Buffer.from([0x28, 0xb5, 0x2f, 0xfd])) + + await wirePublisher.close() + }, 30_000) + + it('compresses payload, offloads to S3 as raw binary, and consumer decompresses correctly', async () => { + const queueName = 'codec-offload-roundtrip' + await testAdmin.deleteQueues(queueName) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-offload-1', + messageType: 'add', + metadata: { info: 'compressed and offloaded' }, + } + + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + skipCompressionBelow: 0, // always compress: this suite exercises the codec + offload path + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + }) + // No codec on consumer — codec is read from payloadRef.codec in the pointer + const consumer = new SqsPermissionConsumer(diContainer.cradle, { + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + }) + + await publisher.init() + await consumer.start() + await publisher.publish(message) + + // Verify payload was offloaded to S3 + const s3Keys = await waitForS3Objects(s3, s3BucketName, 1, 5000) + expect(s3Keys.length).toBeGreaterThan(0) + + // Verify consumer receives the correct decompressed payload + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await publisher.close() + await consumer.close(true) + }, 30_000) + + it('consumer without explicit codec still decompresses codec-offloaded payload', async () => { + const queueName = 'codec-offload-auto-detect' + await testAdmin.deleteQueues(queueName) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: 'codec-offload-auto-1', + messageType: 'add', + metadata: { info: 'auto-detect codec from pointer' }, + } + + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + codec: MessageCodecEnum.ZSTD, + skipCompressionBelow: 0, // always compress: this suite exercises the codec + offload path + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + }) + // Consumer has no explicit codec — should still work because codec comes from payloadRef.codec + const consumer = new SqsPermissionConsumer(diContainer.cradle, { + payloadStoreConfig, + creationConfig: { queue: { QueueName: queueName } }, + deletionConfig: { deleteIfExists: false }, + }) + + await publisher.init() + await consumer.start() + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.message).toMatchObject(message) + + await publisher.close() + await consumer.close(true) + }, 30_000) +}) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.ts index 26618288..f6415d9e 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.ts @@ -32,6 +32,8 @@ type SqsPermissionConsumerOptions = Pick< | 'payloadStoreConfig' | 'messageDeduplicationConfig' | 'enableConsumerDeduplication' + | 'codecs' + | 'disableCodecAutoDetection' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -128,6 +130,8 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< payloadStoreConfig: options.payloadStoreConfig, messageDeduplicationConfig: options.messageDeduplicationConfig, enableConsumerDeduplication: options.enableConsumerDeduplication, + codecs: options.codecs, + disableCodecAutoDetection: options.disableCodecAutoDetection, messageDeduplicationIdField: 'deduplicationId', messageDeduplicationOptionsField: 'deduplicationOptions', handlers: new MessageHandlerConfigBuilder< diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.ts index 33ab0c15..88010643 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.ts @@ -31,6 +31,8 @@ export class SqsPermissionPublisher extends AbstractSqsPublisher, ) { super(dependencies, { @@ -53,6 +55,8 @@ export class SqsPermissionPublisher extends AbstractSqsPublisher { + const result = await s3.getObject({ Bucket: bucket, Key: key }) + const bytes = await result.Body?.transformToByteArray() + if (!bytes) throw new Error(`No body for S3 object ${key}`) + return Buffer.from(bytes) +} + export async function putObjectContent(s3: S3, bucket: string, key: string, content: string) { await s3.putObject({ Bucket: bucket, Key: key, Body: content }) } diff --git a/packages/sqs/vitest.bench.config.ts b/packages/sqs/vitest.bench.config.ts new file mode 100644 index 00000000..fd4e580e --- /dev/null +++ b/packages/sqs/vitest.bench.config.ts @@ -0,0 +1,14 @@ +import { defineConfig } from 'vitest/config' + +// biome-ignore lint/style/noDefaultExport: vite expects default export +export default defineConfig({ + test: { + globals: true, + watch: false, + mockReset: true, + pool: 'threads', + maxWorkers: 1, + setupFiles: ['test/utils/vitest.setup.ts'], + include: ['bench/**/*.ts'], + }, +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4826d0c7..7b3ecc48 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -31,8 +31,8 @@ importers: specifier: ^3.0.0 version: 3.1.0 '@message-queue-toolkit/core': - specifier: '*' - version: 25.5.0(zod@4.4.3) + specifier: workspace:* + version: link:../core '@types/amqplib': specifier: 0.10.8 version: 0.10.8 @@ -150,14 +150,14 @@ importers: specifier: ^3.0.0 version: 3.1.0 '@message-queue-toolkit/core': - specifier: '*' - version: 25.5.0(zod@4.4.3) + specifier: workspace:* + version: link:../core '@message-queue-toolkit/gcs-payload-store': specifier: '*' - version: 1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3)) + version: 1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@packages+core) '@message-queue-toolkit/redis-message-deduplication-store': specifier: '*' - version: 2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3) + version: 2.0.2(@message-queue-toolkit/core@packages+core)(ioredis@5.10.1)(zod@4.4.3) '@message-queue-toolkit/schemas': specifier: '*' version: 7.1.0(zod@4.4.3) @@ -498,14 +498,14 @@ importers: specifier: ^3.0.0 version: 3.1.0 '@message-queue-toolkit/core': - specifier: '*' - version: 25.5.0(zod@4.4.3) + specifier: workspace:* + version: link:../core '@message-queue-toolkit/redis-message-deduplication-store': - specifier: '*' - version: 2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3) + specifier: workspace:* + version: link:../redis-message-deduplication-store '@message-queue-toolkit/s3-payload-store': - specifier: '*' - version: 3.0.0(@aws-sdk/client-s3@3.1048.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3)) + specifier: workspace:* + version: link:../s3-payload-store '@message-queue-toolkit/sqs': specifier: workspace:* version: link:../sqs @@ -565,17 +565,17 @@ importers: specifier: ^3.0.0 version: 3.1.0 '@message-queue-toolkit/core': - specifier: '*' - version: 25.5.0(zod@4.4.3) + specifier: workspace:* + version: link:../core '@message-queue-toolkit/redis-message-deduplication-store': - specifier: '*' - version: 2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3) + specifier: workspace:* + version: link:../redis-message-deduplication-store '@message-queue-toolkit/s3-payload-store': - specifier: '*' - version: 3.0.0(@aws-sdk/client-s3@3.1048.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3)) + specifier: workspace:* + version: link:../s3-payload-store '@message-queue-toolkit/schemas': - specifier: '*' - version: 7.1.0(zod@4.4.3) + specifier: workspace:* + version: link:../schemas '@types/node': specifier: ^25.0.2 version: 25.8.0 @@ -651,6 +651,7 @@ packages: '@aws-sdk/core@3.974.11': resolution: {integrity: sha512-QpnINq5FZH6EOaDEkmHdT7eUunbvD27pDNQypaWjFyYz7Zl1q3UCMQErBZxpmfGfI7MvI2TlK8KTkgNpv8b1ug==} engines: {node: '>=20.0.0'} + deprecated: Deprecated due to an error deserialization bug in JSON 1.0 protocol services, see https://github.com/aws/aws-sdk-js-v3/pull/8031. Newer version available. '@aws-sdk/crc64-nvme@3.972.8': resolution: {integrity: sha512-fVfUCL/Xh2zINYMPZvj+iBn6XWouQf0DAnjaWCI9MkmqXzL2Iy5FoQB8O7syFe6gN6AH1ecDDU58T51Ou0kFkA==} @@ -991,12 +992,6 @@ packages: '@message-queue-toolkit/core': '>=23.1.0' ioredis: ^5.3.2 - '@message-queue-toolkit/s3-payload-store@3.0.0': - resolution: {integrity: sha512-AX2PI74CN9CBQWHT/nJBhUPR8E6beGodTsuSSlZ/zQvy6ViDcI4gEKxFViqKR2xai7PeLsqw+HWdkXhawwEqYA==} - peerDependencies: - '@aws-sdk/client-s3': ^3.596.0 - '@message-queue-toolkit/core': '>=24.0.0' - '@message-queue-toolkit/schemas@7.1.0': resolution: {integrity: sha512-JAzSQAHouympK/cEDBxsfEuS2Ifu1pv0a/NRvhNWfFlgW0TmsWT7SkYNERA7x89OK7PGk9PyDN88cV9l0gZ22Q==} peerDependencies: @@ -3350,26 +3345,21 @@ snapshots: toad-cache: 3.7.0 zod: 4.4.3 - '@message-queue-toolkit/gcs-payload-store@1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3))': + '@message-queue-toolkit/gcs-payload-store@1.0.0(@google-cloud/storage@7.19.0)(@message-queue-toolkit/core@packages+core)': dependencies: '@google-cloud/storage': 7.19.0 - '@message-queue-toolkit/core': 25.5.0(zod@4.4.3) + '@message-queue-toolkit/core': link:packages/core - '@message-queue-toolkit/redis-message-deduplication-store@2.0.2(@message-queue-toolkit/core@25.5.0(zod@4.4.3))(ioredis@5.10.1)(zod@4.4.3)': + '@message-queue-toolkit/redis-message-deduplication-store@2.0.2(@message-queue-toolkit/core@packages+core)(ioredis@5.10.1)(zod@4.4.3)': dependencies: '@lokalise/node-core': 14.8.1(zod@4.4.3) - '@message-queue-toolkit/core': 25.5.0(zod@4.4.3) + '@message-queue-toolkit/core': link:packages/core ioredis: 5.10.1 redis-semaphore: 5.7.0(ioredis@5.10.1) transitivePeerDependencies: - supports-color - zod - '@message-queue-toolkit/s3-payload-store@3.0.0(@aws-sdk/client-s3@3.1048.0)(@message-queue-toolkit/core@25.5.0(zod@4.4.3))': - dependencies: - '@aws-sdk/client-s3': 3.1048.0 - '@message-queue-toolkit/core': 25.5.0(zod@4.4.3) - '@message-queue-toolkit/schemas@7.1.0(zod@4.4.3)': dependencies: zod: 4.4.3