Skip to content

feat: add zstd message compression codec for SNS and SQS#442

Merged
kibertoad merged 24 commits into
kibertoad:mainfrom
irfanh94:main
May 22, 2026
Merged

feat: add zstd message compression codec for SNS and SQS#442
kibertoad merged 24 commits into
kibertoad:mainfrom
irfanh94:main

Conversation

@irfanh94
Copy link
Copy Markdown
Contributor

@irfanh94 irfanh94 commented May 18, 2026

Summary

Adds opt-in message compression for the SQS and SNS adapters.

  • A codec option on SQS/SNS publishers compresses outgoing message bodies. Built-in zstd uses the Node.js built-in zlib module (requires Node.js >=22.15.0) — no native dependency, and nothing extra to install. Custom codecs are supported via the { name, handler } form.
  • Compressed bodies are wrapped in a self-describing envelope { ...routingFields, "__mqtCodec": "zstd", "__mqtData": "<base64>" }. Identity/routing fields (id, type, timestamp, deduplication id/options) are copied as plaintext siblings so broker-side filtering (e.g. SNS body-scoped FilterPolicy) keeps working on compressed messages.
  • Consumers auto-detect codec envelopes and decompress transparently before schema validation — built-in codecs need no consumer configuration; custom codecs are registered via codecs: [...], and disableCodecAutoDetection opts out. An envelope naming an unregistered codec is surfaced as a retriable error (a misconfiguration), not dropped as poison.
  • skipCompressionBelow (default 512 bytes) skips compression for small messages that would only expand under compression.
  • Codec composes with payload offloading: the message is compressed exactly once; the offload-vs-inline decision is made against the codec envelope wire size; the compress-and-offload path streams through a temp file so large payloads are never fully materialized; payloadRef.codec records the algorithm so the consumer decompresses after retrieval with no extra configuration.
  • AMQP and Pub/Sub publishers throw at construction if codec is supplied (unsupported by those adapters).
  • ZstdCodecHandler caps decompressed size (maxDecompressedBytes, default 100 MiB) to guard against decompression bombs.

Changes

  • packages/core — new lib/codec/:
    • messageCodec.tsMessageCodecEnum; the MessageCodec / MessageCodecRegistration / MessageCodecHandler / CodecEnvelope types; hasCodecEnvelopeShape / isCodecEnvelope; KNOWN_CODECS; BASE64_RE.
    • codecHandler.tsZstdCodecHandler, buildCodecEnvelope, resolveCodecHandler, getCodecName.
    • AbstractQueueService gains the shared prepareOutgoingPayload / compressAndOffloadPayload pipeline used by all publishers; codec / skipCompressionBelow added to QueuePublisherOptions; payloadRef.codec added to the offloaded-pointer schema. The codec depends only on Node's built-in zlib, so it ships inside core — there is no separate package to install.
  • packages/sqs — publisher compresses via the shared core pipeline; AbstractSqsConsumer auto-detects codec envelopes and decompresses before schema validation, with codecs[] / disableCodecAutoDetection options; an unregistered codec raises a retriable error.
  • packages/sns — publisher compresses via the shared core pipeline; decompression flows through the shared SQS consumer path.
  • packages/gcp-pubsub, packages/amqp — publishers reject the unsupported codec option at construction time.
  • build / dev — workspace devDependencies switched from "*" to "workspace:*" for consistent local type resolution (pnpm substitutes the concrete version on publish; no impact on npm consumers). Added bench scripts/configs for codec micro- and integration benchmarks.

Test plan

  • pnpm --filter @message-queue-toolkit/core test — codec unit tests (messageCodec.spec.ts, codecHandler.spec.ts) and the AbstractQueueService offload regression suite.
  • pnpm --filter @message-queue-toolkit/sqs test — SQS codec integration suites: round-trip + wire-format assertions, message sequences, external compressor, auto-detection, skipCompressionBelow, disableCodecAutoDetection, custom-codec registration, and codec + payload offloading.
  • pnpm --filter @message-queue-toolkit/sns test — SNS→SQS codec round-trip, sequence, and auto-detection through the SNS envelope.
  • AMQP and Pub/Sub publisher guard tests; existing payload-offloading and consumer suites pass unchanged.

Summary by CodeRabbit

  • New Features

    • Add pluggable message compression (built-in zstd + custom codecs) with automatic consumer detection
    • Publisher-side single-pass compression with optional offload of large compressed payloads
    • Configurable compression threshold via skipCompressionBelow
    • New standalone codec package and benchmark suites
  • Documentation

    • Docs updated with message-compression usage, requirements, and advanced offload behavior
  • Tests

    • Comprehensive codec integration and unit tests added/expanded

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a codec envelope API and new @message-queue-toolkit/codec package (ZSTD handler), refactors offloading to support compressed bytes and payloadRef.codec, integrates compression into SQS/SNS publishers (prepareOutgoingPayload) and consumer auto-detection/decompression, updates options/docs/tests/benchmarks, and rejects codec for unsupported transports.

Changes

Message Codec Compression Feature

Layer / File(s) Summary
Core codec types and validation
packages/core/lib/codec/messageCodec.ts, packages/core/lib/index.ts
Defines MessageCodecEnum, CodecEnvelope (__mqtCodec/__mqtData), MessageCodecHandler, MessageCodecRegistration, BASE64_RE, KNOWN_CODECS, and isCodecEnvelope runtime guard; adds unit tests.
Codec package implementation
packages/codec/lib/codec/codecHandler.ts, packages/codec/lib/index.ts, packages/codec/package.json, packages/codec/tsconfig*.json
New @message-queue-toolkit/codec package with ZstdCodecHandler (compress/decompress/createCompressStream), getCodecName/resolveCodecHandler, compressMessageBody/buildCodecEnvelope/decompressMessageBody with base64 validation, barrel export and build config.
Codec package documentation
packages/codec/README.md
README documents install, envelope format, APIs, streaming handler example, offload integration, and consumer scoping rules.
Queue options codec configuration
packages/core/lib/types/queueOptionsTypes.ts
Adds codec, skipCompressionBelow, and disableCodecAutoDetection to CommonQueueOptions with JSDoc.
AbstractQueueService offloading refactor
packages/core/lib/queues/AbstractQueueService.ts
Removes offloadMessagePayloadIfNeeded; adds offloadPayload, compressAndOffloadPayload, buildPointer (preserve type, embed payloadRef.codec), and extended retrieveOffloadedMessagePayload supporting decompress callback.
Payload storage schema and documentation
packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts, packages/core/lib/payload-store/payloadStoreTypes.ts
Adds optional codec to PAYLOAD_REF_SCHEMA; expands messageSizeThreshold JSDoc to clarify wire-size semantics with codec envelopes.
Stream utilities for decompression
packages/core/lib/utils/streamUtils.ts
Adds streamWithKnownSizeToBuffer and refactors streamWithKnownSizeToString.
SQS publisher message compression
packages/sqs/lib/sqs/AbstractSqsPublisher.ts
Adds prepareOutgoingPayload() handling codec/offload and sendMessage() accepting optional preBuiltBody.
SQS consumer message decompression
packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Builds codec registry (KNOWN_CODECS + custom codecs), detects inline codec envelopes with isCodecEnvelope, resolves handlers to decompress inline/offloaded payloads, and tightens guards.
SQS test harness wiring
packages/sqs/test/...
Propagates codec/codecs and disableCodecAutoDetection through test publishers/consumers; adds S3 getObjectBuffer test helper and extensive integration tests and benchmarks.
SNS publisher message compression
packages/sns/lib/sns/AbstractSnsPublisher.ts
Adds prepareOutgoingPayload() and sendMessage() preBuiltBody support; integrates codec/offload logic for SNS publish path.
SNS consumer type refinement & tests
packages/sns/lib/sns/AbstractSnsSqsConsumer.ts, packages/sns/test/*
Refines SNSSQSConsumerOptions, adds explicit resolveMessage return type, and adds SNS integration tests for codec round-trips and auto-detect.
GCP Pub/Sub & AMQP codec rejection
packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts, packages/amqp/lib/AbstractAmqpPublisher.ts
Constructors now throw if codec option is supplied; updated Pub/Sub offload call to new API; tests added asserting rejection.
Core codec and offload unit tests
packages/core/test/codec/messageCodec.spec.ts, packages/core/test/queues/AbstractQueueService.offload.spec.ts
Adds tests for isCodecEnvelope and updates offload regression tests to use offloadPayload.
SQS codec integration tests & benchmarks
packages/sqs/test/*, packages/sqs/bench/*, packages/sqs/vitest.bench.config.ts
Integration tests for inline/offloaded zstd, skipCompressionBelow behavior, custom codec, payload offloading validation; macro and micro benchmarks for codec performance.
Docs and package manifests
packages/sqs/README.md, packages/sns/README.md, packages/sns/package.json, packages/sqs/package.json, .github/workflows/ci.yml, biome.json
Adds Message Compression docs, adds @message-queue-toolkit/codec peerDependency, updates devDependencies to workspace:*, adds bench script, updates CI package mapping, and disables console linter for bench files.

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested labels: minor

Suggested reviewers:

  • CarlosGamero
  • kjamrog

🐰 "I wrapped the bytes in zstd's song,

one compress pass and then along,
offload when too wide, inline when small,
consumers hum and decode them all."

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly and specifically describes the main change: adding zstd message compression codec support for SNS and SQS publishers/consumers.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/core/lib/codec/messageCodec.ts`:
- Around line 14-21: isCodecEnvelope currently only checks presence of
CODEC_FIELD and DATA_FIELD and that CODEC_FIELD is a supported codec, but it
doesn't assert the type of the payload so a non-string __data will later blow up
in Buffer.from; update isCodecEnvelope to also verify that (value as
Record<string, unknown>)[DATA_FIELD] is a string (and optionally that (value as
Record<string, unknown>)[CODEC_FIELD] is a string) before returning true so
downstream code (e.g., Buffer.from usage) is guaranteed a string __data.

In `@packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts`:
- Around line 82-103: The test has a race because the original consumer started
in beforeEach remains running and can steal messages from the SQS queue; modify
the spec to stop the original consumer before creating/starting the
SnsSqsPermissionConsumer instance named autoConsumer (call the appropriate
close/shutdown on the existing consumer from beforeEach), then start
autoConsumer, publish and assert via
autoConsumer.handlerSpy.waitForMessageWithId, and finally restore/close
autoConsumer; reference the existing variables/instances consumer, autoConsumer,
SnsSqsPermissionConsumer, and handlerSpy.waitForMessageWithId to locate where to
add the consumer.close() call.

In `@packages/sqs/lib/sqs/AbstractSqsConsumer.ts`:
- Around line 896-904: The current logic replaces
resolveMessageResult.result.body with the decompressed payload (using
isCodecEnvelope and decompressMessageBody), which loses the original codec
envelope needed when republishing retries; instead, preserve the original
compressed envelope by not overwriting resolveMessageResult.result.body—store
the decompressed payload on a new field (e.g., decompressedBody) or attach it to
originalMessage so downstream processing can read the decompressed content while
any retry/republish logic still serializes the original codec envelope; keep the
existing try/catch and error handling (handleError and ABORT_EARLY_EITHER) but
ensure decompressMessageBody is only used to populate the new field and never
replaces the envelope used for republish.

In `@packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts`:
- Around line 104-122: Test is flaky because the original consumer started in
beforeEach (consumer) is still polling the same queueUrl when you create
autoConsumer, so the published message may be consumed by the original consumer
instead of autoConsumer; to fix, stop the original consumer before
creating/starting autoConsumer (call and await consumer.close(true) or
consumer.stop() as appropriate) so only autoConsumer is polling this queue for
the duration of this test, then publish and await
handlerSpy.waitForMessageWithId on autoConsumer and finally restart or clean up
the original consumer if needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5cc1c3d3-5e4b-48db-97b1-d2d1d9701551

📥 Commits

Reviewing files that changed from the base of the PR and between e551bb5 and 0794af6.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (18)
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/index.ts
  • packages/core/lib/queues/AbstractQueueService.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/core/package.json
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
  • packages/sns/package.json
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
  • packages/sns/test/publishers/SnsPermissionPublisher.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.ts
  • packages/sqs/test/publishers/SqsPermissionPublisher.ts
  • pnpm-workspace.yaml

Comment thread packages/core/lib/codec/messageCodec.ts Outdated
Comment thread packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
Comment thread packages/sqs/lib/sqs/AbstractSqsConsumer.ts Outdated
Comment thread packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
Comment thread packages/core/lib/codec/messageCodec.ts Outdated
Comment thread packages/core/lib/codec/messageCodec.ts Outdated
…ls, fix test races

- Move @mongodb-js/zstd out of core into sqs; core now only defines
  MessageCodecHandler interface + pure envelope types (CodecEnvelope,
  isCodecEnvelope, MessageCodec) with no native dependencies
- Add packages/sqs/lib/codec/sqsCodecHandler.ts: ZstdCodecHandler,
  resolveCodecHandler, and the concrete compressMessageBody /
  decompressMessageBody helpers
- AbstractSqsPublisher and AbstractSqsConsumer import from local codec;
  AbstractSnsPublisher imports compressMessageBody from @message-queue-toolkit/sqs
- Strengthen isCodecEnvelope to assert typeof __data === 'string' so
  Buffer.from downstream is guaranteed a string
- Fix race condition in SQS codec auto-detection test: use a dedicated
  queue (user_permissions_multi-auto-detect) instead of sharing the
  beforeEach consumer's queue, eliminating both the steal-race and the
  localstack long-poll timing issue
- Fix race condition in SNS codec auto-detection test: stop the original
  consumer before starting autoConsumer, reassign consumer = autoConsumer
  so afterEach handles cleanup without a double-close

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread packages/core/lib/codec/codecHandler.ts
Comment thread packages/core/lib/queues/AbstractQueueService.ts Outdated
Irfan Hodzic and others added 2 commits May 19, 2026 09:16
…tall

The native addon requires node-gyp compilation. pnpm install runs with
--ignore-scripts in CI, so the binary is never built. pnpm rebuild
explicitly compiles it regardless of that flag.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Measures wall-clock time and msg/s for 50 messages with and without
zstd compression across small (~80 B) and large (~6 KB) payloads.
Each run deletes its queues before and after so no resources are left
behind. Run with: pnpm --filter @message-queue-toolkit/sqs bench

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread packages/core/lib/codec/messageCodec.ts Outdated
…nd add codec documentation

Switch from @mongodb-js/zstd (native node-gyp addon requiring Python and
a C++ toolchain) to zlib.zstdCompress/zstdDecompress built into Node.js 22+.
This removes 24 transitive packages, drops the pnpm rebuild CI step, and
eliminates native build requirements for end users of the package.

Refactor MessageCodec to use MessageCodecEnum object pattern, enabling
MessageCodecEnum.ZSTD usage alongside the plain string literal.

Add JSDoc to MessageCodecEnum, MessageCodecHandler, and the codec option
in queueOptionsTypes. Add a Message Compression section to the SQS README
with publisher/consumer examples and auto-detection behaviour, and reference
it from the SNS README.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/sqs/lib/codec/sqsCodecHandler.ts`:
- Around line 6-7: The import eagerly promisifies
zlib.zstdCompress/zstdDecompress (zstdCompress, zstdDecompress) which will throw
on Node versions before 23.8.0; add a runtime guard that checks for
zlib.zstdCompress and zlib.zstdDecompress existence before calling promisify and
throw a clear error message if missing, and update packages/sqs/package.json to
include "engines": { "node": ">=23.8.0" } (or your chosen minimum) and align the
CI matrix to only run supported Node versions; reference the symbols
zstdCompress and zstdDecompress in your guard and update related tests/CI
configs accordingly.

In `@packages/sqs/README.md`:
- Around line 848-850: Update the compression notes that currently state "256
KB" to the correct AWS SQS maximum "1 MiB" so the text about compression being
applied after schema validation and before the SQS SendMessage call, and the
note about compressed payload size limits, match the rest of the README; ensure
the wording near MessageCodecEnum.ZSTD (and any mention of SQS 256 KB) is
replaced with "1 MiB" and is consistent with other occurrences in the document.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 18893cc0-2a22-4b04-9337-32b52eb05035

📥 Commits

Reviewing files that changed from the base of the PR and between 0794af6 and c1d83e7.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (16)
  • biome.json
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/index.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/sns/README.md
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
  • packages/sqs/README.md
  • packages/sqs/bench/codec.bench.ts
  • packages/sqs/lib/codec/sqsCodecHandler.ts
  • packages/sqs/lib/index.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/vitest.bench.config.ts
✅ Files skipped from review due to trivial changes (1)
  • packages/sns/README.md
🚧 Files skipped from review as they are similar to previous changes (5)
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts

Comment thread packages/codec/lib/codec/codecHandler.ts Outdated
Comment thread packages/sqs/README.md
const zstdCompress = promisify(zlib.zstdCompress)
const zstdDecompress = promisify(zlib.zstdDecompress)

export class ZstdCodecHandler implements MessageCodecHandler {
Copy link
Copy Markdown
Collaborator

@CarlosGamero CarlosGamero May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we’re no longer relying on external libraries for compression, we might consider moving this utility into the core. That would allow us to reuse it across other modules like amp or gcp-pubsup without having to reimplement it each time.

@kibertoad tagging you to get your thoughts on this, especially since you previously suggested moving it out of core 😓

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just saw the comment about a separate package. I agree with that as well—ultimately, we’re both talking about the same thing: enabling compression support across the different technologies we support.

One open question I’m still unsure about: does it really make sense to create a separate package if we’re going to rely on Node internals anyway? If we keep it in core, customers don’t need to install additional dependencies they may not use. From that perspective, having it in core feels sufficient, but I don’t have a strong opinion either way.

Comment thread packages/sns/README.md
Comment thread pnpm-workspace.yaml Outdated
Move the zstd codec implementation (ZstdCodecHandler, compressMessageBody,
decompressMessageBody, resolveCodecHandler) from packages/sqs into a new
dedicated packages/codec package so any adapter can use compression without
depending on @message-queue-toolkit/sqs.

- Create packages/codec with package.json, tsconfigs, and lib/codec/codecHandler.ts
- Delete packages/sqs/lib/codec/sqsCodecHandler.ts
- Update sqs and sns to import from @message-queue-toolkit/codec
- Re-export codec functions from @message-queue-toolkit/sqs for backwards compatibility
- Add @message-queue-toolkit/codec as peer dependency in sqs and sns packages
- Remove @mongodb-js/zstd from pnpm-workspace.yaml allowBuilds (no longer used)
- Register packages/codec in CI PATH_TO_NAME map
- Update SQS and SNS READMEs to document codec as a separate peer dependency

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread packages/core/lib/types/queueOptionsTypes.ts Outdated
Comment thread packages/core/lib/types/queueOptionsTypes.ts Outdated
options: SNSMessageOptions,
): Promise<void> {
const attributes = resolveOutgoingMessageAttributes<MessageAttributeValue>(payload)
const jsonBody = JSON.stringify(payload)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json-stream-stringify is a better option here, and compression method ideally should operate on streams, not on full content

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kibertoad Is it safe to use this package: https://www.npmjs.com/package/json-stream-stringify since its been 2 years since last update.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be safe, it's likely just complete, nothing to add. we use it elsewhere in AP too

Irfan Hodzic and others added 3 commits May 19, 2026 10:57
zlib.zstdCompress/zstdDecompress were added in Node.js v22.15.0 and v23.8.0,
not v22.0.0. The previous "Node.js 22+" claim was incorrect and would cause a
cryptic TypeError at import time on v22.0.0-v22.14.x.

- Add runtime guard in codecHandler.ts that throws a clear error if zstd
  functions are missing, before promisify() is called
- Add engines: { node: ">=22.15.0" } to packages/codec/package.json
- Update all JSDoc and README references from "Node.js 22+" to ">=22.15.0"

CI matrix (22.x, 24.x) resolves to latest patches which are >=22.15.0 — no change needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fixes double compression: previously publish() delegated to
offloadMessagePayloadIfNeeded which compressed the payload to check the
size threshold, then returned the original message, and sendMessage()
compressed again.

Now when a codec is set, the message is compressed exactly once at
publish() entry point, regardless of whether offloading is also
configured. The same compressed Buffer is then either:
- stored in S3 and replaced with a pointer (if compressed size exceeds
  messageSizeThreshold), or
- wrapped in a codec envelope and sent inline (if it fits).

The payload is never compressed twice.

Key changes:
- codec: add buildCodecEnvelope(compressed, codec) to wrap pre-compressed
  bytes without re-compressing
- core: replace offloadMessagePayloadIfNeeded with three focused methods:
  - private buildPointer() — shared pointer construction logic
  - protected offloadPayload() — no-codec path, returns null if fits
  - protected offloadCompressedPayload() — codec path, always stores
- sqs/sns: restructure publish() via private prepareOutgoingPayload()
  that compresses once and branches; sendMessage() accepts preBuiltBody
  to skip re-serialization
- gcp-pubsub: migrate to offloadPayload(), pin core to workspace:*
- docs: update SQS, SNS, core, and codec READMEs to explain the single
  compression pass and how codec interacts with payload offloading

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
packages/core/lib/utils/streamUtils.ts (1)

13-14: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate that stream data fits within the expected size.

Buffer.copy() silently truncates if the source doesn't fit in the target buffer. If the stream contains more data than the provided size, line 13 will copy only what fits and line 14 will increment offset by the full chunk length, causing offset tracking to become inaccurate and subsequent chunks to be written at incorrect positions or dropped entirely. In the codec compression context, truncated compressed payloads will fail decompression.

🛡️ Proposed fix to detect and reject oversized streams
-    chunkBuffer.copy(buffer, offset)
-    offset += chunkBuffer.length
+    const bytesCopied = chunkBuffer.copy(buffer, offset)
+    if (bytesCopied !== chunkBuffer.length) {
+      throw new Error(
+        `Stream size exceeds expected size of ${size} bytes. Data truncation detected at offset ${offset}.`
+      )
+    }
+    offset += bytesCopied
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/lib/utils/streamUtils.ts` around lines 13 - 14, The code
currently copies chunkBuffer into buffer without checking bounds, then
increments offset by chunkBuffer.length which breaks offset tracking if the
stream exceeds the provided size; in the stream processing function in
streamUtils.ts (the block using variables buffer, offset, chunkBuffer and size)
add a pre-copy check: if offset + chunkBuffer.length > size, reject/throw an
error (or return a failed Promise) indicating the stream is oversized; otherwise
perform Buffer.copy and increment offset by the actual number of bytes copied
(or chunkBuffer.length if safe). Ensure the error/path prevents silent
truncation and surfaces the oversized-stream condition to callers.
packages/core/lib/queues/AbstractQueueService.ts (1)

835-860: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Return an explicit error when codec metadata is present but no decompressor is provided.

At Line 835, if codec exists but decompress is undefined, execution falls through and tries JSON parsing compressed bytes (Line 858), which hides the real failure mode.

🛠️ Suggested fix
     const codec = parsedPayload.payloadRef?.codec
+    if (codec && !decompress) {
+      return {
+        error: new Error(
+          `Offloaded payload is compressed with codec "${codec}" but no decompressor callback was provided`,
+        ),
+      }
+    }
+
     if (codec && decompress) {
       try {
         const compressedBuffer = await streamWithKnownSizeToBuffer(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/lib/queues/AbstractQueueService.ts` around lines 835 - 860,
When parsedPayload.payloadRef?.codec is present but the decompress function is
undefined, the code must return an explicit error instead of falling through and
treating compressed bytes as JSON; update the branch around codec handling in
AbstractQueueService (the block using parsedPayload.payloadRef?.codec,
decompress, and streamWithKnownSizeToBuffer/streamWithKnownSizeToString) to
detect codec && !decompress and immediately return an Error (with a clear
message and cause if appropriate) indicating "no decompressor provided for codec
<codec>" so the later JSON.parse path (using streamWithKnownSizeToString) is not
attempted on compressed data.
♻️ Duplicate comments (1)
packages/sqs/README.md (1)

872-872: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix incorrect size limit (duplicate) and markdown link syntax.

Two issues on this line:

  1. Incorrect SQS limit (duplicate of past review): States "256 KB" but should be "1 MiB" to match the rest of the README (lines 76, 385, 462, 768, 782, 790) and AWS documentation.

  2. Broken markdown link syntax: The link has backticks around the anchor: [Payload Offloading](`#payload-offloading`) which renders incorrectly. Should be [Payload Offloading](#payload-offloading).

📝 Proposed fix
-- Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration.
+- Compressed payloads are still subject to the SQS 1 MiB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/sqs/README.md` at line 872, Update the sentence to use the correct
SQS size limit and fix the markdown link: replace "256 KB" with "1 MiB" to match
other README references and AWS docs, and change the link syntax from [Payload
Offloading](`#payload-offloading`) to [Payload Offloading](`#payload-offloading`)
so the anchor renders correctly; ensure the note still mentions combining
compression with the Payload Offloading flow and that payloadRef.codec records
the algorithm.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/codec/README.md`:
- Around line 47-51: The example in README.md uses resolveCodecHandler but does
not import it; update the import list at the top (where buildCodecEnvelope is
imported) to also import resolveCodecHandler (and any other required symbols
like MessageCodecEnum if not present) so the snippet runs as-is; locate the
import statement that currently reads "import { buildCodecEnvelope } from
'`@message-queue-toolkit/codec`'" and add resolveCodecHandler to that named
import.

---

Outside diff comments:
In `@packages/core/lib/queues/AbstractQueueService.ts`:
- Around line 835-860: When parsedPayload.payloadRef?.codec is present but the
decompress function is undefined, the code must return an explicit error instead
of falling through and treating compressed bytes as JSON; update the branch
around codec handling in AbstractQueueService (the block using
parsedPayload.payloadRef?.codec, decompress, and
streamWithKnownSizeToBuffer/streamWithKnownSizeToString) to detect codec &&
!decompress and immediately return an Error (with a clear message and cause if
appropriate) indicating "no decompressor provided for codec <codec>" so the
later JSON.parse path (using streamWithKnownSizeToString) is not attempted on
compressed data.

In `@packages/core/lib/utils/streamUtils.ts`:
- Around line 13-14: The code currently copies chunkBuffer into buffer without
checking bounds, then increments offset by chunkBuffer.length which breaks
offset tracking if the stream exceeds the provided size; in the stream
processing function in streamUtils.ts (the block using variables buffer, offset,
chunkBuffer and size) add a pre-copy check: if offset + chunkBuffer.length >
size, reject/throw an error (or return a failed Promise) indicating the stream
is oversized; otherwise perform Buffer.copy and increment offset by the actual
number of bytes copied (or chunkBuffer.length if safe). Ensure the error/path
prevents silent truncation and surfaces the oversized-stream condition to
callers.

---

Duplicate comments:
In `@packages/sqs/README.md`:
- Line 872: Update the sentence to use the correct SQS size limit and fix the
markdown link: replace "256 KB" with "1 MiB" to match other README references
and AWS docs, and change the link syntax from [Payload
Offloading](`#payload-offloading`) to [Payload Offloading](`#payload-offloading`)
so the anchor renders correctly; ensure the note still mentions combining
compression with the Payload Offloading flow and that payloadRef.codec records
the algorithm.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e7a677e7-4dda-4dd8-8852-fcad4c20b50f

📥 Commits

Reviewing files that changed from the base of the PR and between c1d83e7 and ff779b5.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (28)
  • .github/workflows/ci.yml
  • packages/codec/README.md
  • packages/codec/lib/codec/codecHandler.ts
  • packages/codec/lib/index.ts
  • packages/codec/package.json
  • packages/codec/tsconfig.build.json
  • packages/codec/tsconfig.json
  • packages/core/README.md
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts
  • packages/core/lib/queues/AbstractQueueService.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/core/lib/utils/streamUtils.ts
  • packages/core/test/queues/AbstractQueueService.offload.spec.ts
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts
  • packages/gcp-pubsub/package.json
  • packages/sns/README.md
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/package.json
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
  • packages/sqs/README.md
  • packages/sqs/lib/index.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.ts
✅ Files skipped from review due to trivial changes (6)
  • packages/gcp-pubsub/package.json
  • packages/codec/tsconfig.json
  • packages/codec/lib/index.ts
  • packages/core/README.md
  • packages/codec/package.json
  • packages/sns/README.md
🚧 Files skipped from review as they are similar to previous changes (8)
  • packages/sqs/lib/index.ts
  • packages/sqs/package.json
  • packages/sqs/test/consumers/SqsPermissionConsumer.ts
  • packages/sns/package.json
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts

Comment thread packages/codec/README.md Outdated
For the inline codec path, read the raw SQS message body from an isolated
queue (no consumer) using ReceiveMessageCommand and assert:
- body is a JSON codec envelope with __codec === 'zstd'
- __data decodes from base64 to a valid zstd frame (magic bytes 28 B5 2F FD)

For the codec + payload offloading path, assert:
- SQS message body is a plain JSON pointer (no __codec field), with
  payloadRef.codec === 'zstd' confirming which algorithm was used
- S3 object contains raw compressed binary, not a JSON envelope
  (first 4 bytes match the zstd magic number 0xFD2FB528)

Also add getObjectBuffer() to s3Utils for reading S3 objects as raw
Buffer without UTF-8 decoding, and fix missing resolveCodecHandler import
in codec README example snippet.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Owner

@kibertoad kibertoad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review — zstd message compression codec

Reviewed for memory efficiency, performance, bugs, and gaps. Most points are inline; the rest are below. Overall the feature is well-structured (single compression pass, self-describing envelope, auto-detection) and well-tested — but there are a few correctness issues worth resolving before merge.

P0 — Bugs / data-loss risks

  1. Inline path can exceed the 256 KB SQS/SNS hard limit — the inline-vs-offload decision compares the compressed size, but the wire body is the ~33%-larger base64 envelope. (inline on both publishers)
  2. Auto-detection false positives are silently dropped / DLQ'd foreverisCodecEnvelope only checks that two loosely-named fields exist; a legitimate message carrying them is misclassified, fails to decompress, and is lost. (inline on messageCodec.ts and AbstractSqsConsumer.ts)
  3. payloadRef.codec is z.string() — accepts unknown codecs that then throw late in the consume path. (inline)
  4. Module-load assertion crashes any importer (incl. type-only) on Node < 22.15. (inline)

P1 — Performance / memory

  1. Compression breaks the streaming-offload memory bound — the payload is materialised 3× in memory; very large messages that previously streamed to disk can now OOM. (inline)
  2. Unconditional compression makes tiny messages larger (frame + base64 overhead). (inline)
  3. isCodecEnvelope rebuilds an array on every consumed message. (inline)
  4. buildCodecEnvelope double-allocates. (inline)
  5. streamWithKnownSizeToBuffer returns a subarray that pins the full allocation. (inline)

P2 — API / extensibility

  1. MessageCodecHandler is documented as a plug-in point but there's no registry. (inline)
  2. Enabling codec silently changes messageSizeThreshold semantics (raw → compressed size). (noted inline on the threshold check)
  3. codec is accepted but silently ignored on Pub/Sub and AMQP publishers. (inline)
  4. Type-cast bypasses codec validation in the consumer. (inline)
  5. payloadRef.size becomes the compressed size — observability discontinuity. (inline)
  6. Re-export from @message-queue-toolkit/sqs duplicates the public surface. (inline)

P3 — Minor / polish

  1. @types/node ^25 vs engines >=22.15. (inline)
  2. __codec / __data are short, collision-prone field names. (inline)
  3. Transient vs. poison failures aren't distinguished on the decompress path. (inline)
  4. The unrelated resolveMessage type-annotation change should be a separate commit. (inline)
  5. Missing test: the README documents that compression can prevent offloading (raw size > threshold, compressed size < threshold), but no test asserts S3 is never touched in that case. The wire-format test doesn't combine an inline codec envelope with payloadStoreConfig set.
  6. The benchmark only prints to console and can't gate regressions in CI. (inline)

Recommended action order

  1. Fix #1 (256 KB ceiling) and #2 (false-positive data loss) — these are correctness, not polish.
  2. Restore streaming for the offload-with-codec path (#5) and add a minSize skip (#6).
  3. Tighten validation around payloadRef.codec (#3) and the consumer decompress dispatch (#13).
  4. Make the codec-handler registry real (#10) before the public API ossifies.
  5. Everything else can land as follow-ups.

Happy to help draft patches for any of these.

}> {
const codec = this.codec

if (codec) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unconditional compression — small messages get bigger. Every message hits the codec path regardless of size. For tiny payloads (≲200 B) the zstd frame overhead plus the ~33% base64 inflation of the inline envelope means the wire payload grows rather than shrinks. High-throughput small-message systems pay pure CPU + size overhead for no benefit. Consider skipping compression when the raw Buffer.byteLength is below a floor (e.g. ~512 B), ideally exposed as a minSize codec option.

if (codec) {
// Compress once up-front, then decide: offload the compressed bytes or send inline.
const compressed = await resolveCodecHandler(codec).compress(
Buffer.from(JSON.stringify(message), 'utf8'),
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory regression vs. the streaming offload path. Pre-PR, large payloads streamed through JsonStreamStringifySerializer (stream → temp file → stream upload) — bounded memory for arbitrarily large messages. With codec on, the whole payload is materialised three times right here: JSON.stringify(message) (string) → Buffer.from(..., 'utf8') (buffer) → compressed (buffer). For payloads big enough that streaming offload was the entire point (tens/hundreds of MB), this can OOM the publisher. Consider a true streaming path for the offload-with-codec case: JsonStreamStringifyzlib.createZstdCompress() → temp file → store.storePayload.


if (
this.payloadStoreConfig &&
compressed.byteLength > this.payloadStoreConfig.messageSizeThreshold
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline path can exceed the SQS 256 KB hard limit. The inline-vs-offload decision compares compressed.byteLength against messageSizeThreshold, but the actual wire body is JSON.stringify({ __codec, __data: base64(compressed) }) — roughly compressed × 4/3 plus envelope overhead. With messageSizeThreshold set at/near SQS's limit, a compressed payload just under the threshold produces a wire body well over 256 KB and SQS rejects the publish at runtime. Compare the envelope length (or Math.ceil(compressed.byteLength * 4 / 3) + ENVELOPE_OVERHEAD) against the threshold.

Separately — enabling codec silently changes messageSizeThreshold semantics: without codec it gates on raw size, with codec it gates on compressed size, so offloading triggers far less often. Worth documenting explicitly, and possibly worth separate thresholds.


if (
this.payloadStoreConfig &&
compressed.byteLength > this.payloadStoreConfig.messageSizeThreshold
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same 256 KB-ceiling bug as the SQS publisher (see the comment on AbstractSqsPublisher.prepareOutgoingPayload). compressed.byteLength is compared, but the inline wire body is the base64 envelope (~33% larger), and SNS has the same 256 KB limit. Fix both call sites together — ideally factor the inline-vs-offload decision into a single shared helper so the two publishers can't drift.

Comment thread packages/core/lib/codec/messageCodec.ts Outdated
} as const
export type MessageCodec = ObjectValues<typeof MessageCodecEnum>

const CODEC_FIELD = '__codec'
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__codec / __data are short, un-namespaced field names. Two-underscore prefixes collide easily with ORM/framework internals and with user schemas. Combined with the loose isCodecEnvelope check below, a collision means a real message is mistaken for a codec envelope. Consider a less collidable namespace, e.g. __mqtCodec / __mqtData.

const messageData = Buffer.from(JSON.stringify(message))
return messageData.length
})
const maybeOffloadedPayloadMessage =
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

codec is silently ignored for Pub/Sub (and AMQP). codec lives on CommonQueueOptions, so every publisher's constructor accepts it, but only the SQS/SNS publishers act on it — this Pub/Sub publisher only got the offloadPayload return-type update. A user who sets codec on a Pub/Sub or AMQP publisher gets no compression and no error. Either implement it here too, or throw at construction when codec is set on a publisher that doesn't support it.

Comment thread packages/sqs/lib/index.ts Outdated
@@ -1,3 +1,9 @@
export {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-exporting codec utilities from @message-queue-toolkit/sqs creates a second public import path for symbols that already live in @message-queue-toolkit/codec. Two import paths for the same API complicate future deprecation and invite version skew. Prefer having consumers import from @message-queue-toolkit/codec directly and drop the re-export.

Comment thread packages/codec/package.json Outdated
"@lokalise/biome-config": "^3.1.0",
"@lokalise/tsconfig": "^3.0.0",
"@message-queue-toolkit/core": "workspace:*",
"@types/node": "^25.0.2",
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: @types/node is ^25 while engines.node is >=22.15.0. Usually harmless, but pinning the dev types to a range matching the lowest supported runtime avoids accidentally compiling against APIs newer than Node 22.


// ─── Configuration ────────────────────────────────────────────────────────────

const N = 50
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful smoke benchmark, but it only prints to console — it can't catch a regression in CI. Consider saving a baseline (msg/s, overhead %) and asserting against it, or wiring it into vitest bench with thresholds. Also, N = 50 against LocalStack is dominated by network round-trips, so the compression CPU cost is buried in the noise — a separate micro-benchmark of compress/decompress alone would show the actual codec overhead.

}

protected override resolveMessage(message: SQSMessage) {
protected override resolveMessage(
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resolveMessage return-type tightening is unrelated to the codec feature. Not wrong, but it makes the PR harder to review and to revert independently — ideally a separate commit/PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that it's unrelated — it slipped in as a drive-by while the file was open. Happy to keep it here since it's a no-op (TypeScript already inferred the same type), but let me know if you'd prefer I revert it and land it separately.

Irfan Hodzic and others added 2 commits May 21, 2026 12:16
Introduce `skipCompressionBelow` on publishers (default: 512 bytes).
When a message's serialized JSON is smaller than this threshold,
compression is skipped and the message is sent as plain JSON instead.
Small messages often expand when compressed due to zstd framing
overhead, so skipping compression avoids that cost by default.

Set to 0 to compress every message regardless of size.

Renames the earlier `minCompressionSize` option before it shipped.
Updates docs in core, SQS and SNS READMEs and strengthens codec
integration tests with wire-format assertions and padding to ensure
messages exceed the default threshold when compression is expected.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When codec and payloadStoreConfig are both set, the previous code
materialised the full payload three times before uploading:
JSON.stringify (string) → Buffer.from (buffer) → compress(buffer).
For payloads large enough to need streaming offload this could OOM.

Adds AbstractQueueService.compressAndOffloadPayload: serialises once
via the configured serialiser, pipes the Readable through
zlib.createZstdCompress() into a temp file, then either streams the
temp file directly to the store (if compressed size exceeds threshold)
or reads the small buffer for an inline codec envelope. Temp file is
always cleaned up in a finally block.

The inline-only path (codec without payloadStoreConfig, bounded by the
256 KB protocol limit) keeps the existing buffer approach unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Irfan Hodzic and others added 4 commits May 21, 2026 13:51
…ermediate object

1. Rename __codec/__data → __mqtCodec/__mqtData
   Two-underscore prefixes collide with ORM/framework internals and user
   schemas; a collision with the loose isCodecEnvelope check would silently
   mistake a real message for a codec envelope. The mq-toolkit namespace
   prefix is unambiguous. Breaking change — existing compressed messages on
   the wire are not readable by the updated consumer.

2. Fix compressAndOffloadPayload threshold comparison (Issue 1)
   The previous check used raw compressedSize; the actual wire body is a
   codec envelope where the compressed bytes are base64-encoded (~×4/3).
   With messageSizeThreshold set near the protocol limit a payload just
   under the threshold could produce an envelope well over the limit and be
   rejected at runtime. Now compares estimated envelope size
   (⌈N×4/3⌉ + 32 + codec.length) against the threshold.

3. Skip intermediate object in buildCodecEnvelope
   JSON.stringify({ __mqtCodec, __mqtData }) allocated a transient object
   between the base64 string and the final envelope string. String
   concatenation avoids that allocation with no observable difference.

4. Document messageSizeThreshold wire-size semantics (Issue 2)
   Without codec the threshold gates on raw JSON size; with codec it gates
   on envelope wire size (base64-encoded compressed payload + JSON framing).
   Enabling codec raises the effective bar for offloading since compression
   shrinks the payload before comparison. Both SinglePayloadStoreConfig and
   MultiPayloadStoreConfig JSDoc now explain this explicitly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…m handler support

- Add MessageCodecRegistration union type (built-in string | { name, handler }) so
  publishers and consumers accept custom codec handlers without forking
- Add createCompressStream() to MessageCodecHandler interface, eliminating
  codec-specific branching in the streaming offload path
- Add KNOWN_CODECS module-level Set (built once from MessageCodecEnum values)
  and expose it from core barrel for hot-path reuse
- Add optional knownCodecs param to isCodecEnvelope for per-consumer scoping
- Make zstd availability check lazy (throw inside compress/decompress, not at import)
- Remove codec/skipCompressionBelow from consumer options; replace with codecs array
- Consumer auto-registers all built-in codecs plus any user-supplied ones;
  throws on unknown codec name in incoming message
- Add getCodecName/resolveCodecHandler helpers for object-form registrations
- Add unit tests for isCodecEnvelope with custom knownCodecs
- Add integration tests for custom codec round-trip and scoped auto-detection
- Update README docs for custom codec registration on both publisher and consumer

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…in decompressMessageBody

Buffer.from(str, 'base64') silently drops non-base64 characters, so a malformed
envelope that bypasses isCodecEnvelope would produce garbage bytes and a confusing
codec error. Now throws a clear 'not valid base64' error before any decode attempt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Guard PubSub and AMQP publishers against unsupported codec option at
  construction time (throw before super() is called)
- Split retrieval vs poison errors in retrieveOffloadedMessagePayload so
  transient stream failures propagate (retriable) while decompression/
  parse failures return an error (DLQ)
- Remove dead offloadCompressedPayload method superseded by
  compressAndOffloadPayload
- Fix streamUtils.ts backing-store pin: copy buffer when offset != size
- Remove codec re-exports from SQS barrel to avoid dual import-path risk
- Pin @types/node to ^22.0.0 in codec package to match engines.node
- Add micro-benchmark with CI-assertable timing thresholds
- Add tests: base64 validation, PubSub guard, AMQP guard (via minimal
  pass-through subclass since AmqpPermissionPublisher strips unknown opts)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/core/lib/utils/streamUtils.ts (1)

7-20: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fail fast on payload size mismatch instead of silently truncating/short-returning.

This helper currently accepts underflow/overflow silently. That masks transport/storage integrity issues and can misroute transient read problems into non-retriable decode failures.

Suggested guardrails
 export async function streamWithKnownSizeToBuffer(stream: Readable, size: number): Promise<Buffer> {
   const buffer = Buffer.alloc(size)
   let offset = 0

   for await (const chunk of stream) {
     if (typeof chunk !== 'string' && !Buffer.isBuffer(chunk)) {
       continue
     }

     const chunkBuffer = !Buffer.isBuffer(chunk) ? Buffer.from(chunk, 'utf8') : chunk
+    if (offset + chunkBuffer.length > size) {
+      throw new Error(
+        `Stream produced more data than expected: expected ${size} bytes, received >${size} bytes`,
+      )
+    }
     chunkBuffer.copy(buffer, offset)
     offset += chunkBuffer.length
   }

-  // Copy only when the stream delivered fewer bytes than expected so the
-  // full backing allocation is not retained via a shared-memory view.
-  return offset === size ? buffer : Buffer.from(buffer.subarray(0, offset))
+  if (offset !== size) {
+    throw new Error(
+      `Stream produced fewer data than expected: expected ${size} bytes, received ${offset} bytes`,
+    )
+  }
+  return buffer
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/lib/utils/streamUtils.ts` around lines 7 - 20, The
stream-to-buffer helper currently masks payload size mismatches by truncating or
slicing the allocation; modify the loop and return logic to fail fast: inside
the for-await loop (variables: stream, chunk, chunkBuffer, offset, size, buffer)
throw an Error if a chunk would overflow the expected size (i.e., if offset +
chunkBuffer.length > size) instead of copying, and after the loop throw an Error
if offset !== size (underflow) instead of returning a shorter Buffer; include
the expected size and actual bytes read in the error messages to aid debugging.
♻️ Duplicate comments (1)
packages/sqs/README.md (1)

872-872: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix the incorrect SQS message size limit.

Line 872 states "256 KB", but the current AWS SQS maximum message size is 1 MiB. This contradicts the rest of the README which correctly uses 1 MiB throughout (lines 76, 385, 462, 768, 782, 790).

📝 Proposed fix
-- Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration.
+- Compressed payloads are still subject to the SQS 1 MiB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`). The compressed payload is then stored in S3 and the `payloadRef.codec` field records the algorithm so the consumer can decompress after retrieval without any extra configuration.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/sqs/README.md` at line 872, Update the incorrect SQS message size in
the README sentence that begins "Compressed payloads are still subject to the
SQS 256 KB message size limit." — change "256 KB" to "1 MiB" so the sentence
reads that compressed payloads are subject to the SQS 1 MiB message size limit;
ensure the rest of the sentence about using Payload Offloading and setting
payloadRef.codec remains unchanged.
🧹 Nitpick comments (2)
packages/codec/lib/codec/codecHandler.ts (1)

89-97: ⚡ Quick win

decompressMessageBody can't resolve custom codec registrations.

resolveCodecHandler(envelope.__mqtCodec as MessageCodecRegistration) only handles the built-in zstd enum branch — passing a string like 'lz4' from a custom envelope throws Unsupported codec. Compression accepts a MessageCodecRegistration (string or { name, handler }), but decompression here can't be told about custom handlers, so this exported helper is only usable for built-in codecs. Consider accepting an optional handler/registry parameter so the public API is symmetric with compressMessageBody.

The integrated SQS consumer path uses its own registry (per the stack), so this is not blocking, but the asymmetry is surprising for users calling the helper directly.

♻️ Proposed signature
-export async function decompressMessageBody(envelope: CodecEnvelope): Promise<unknown> {
+export async function decompressMessageBody(
+  envelope: CodecEnvelope,
+  codec?: MessageCodecRegistration,
+): Promise<unknown> {
   if (!BASE64_RE.test(envelope.__mqtData)) {
     throw new Error(`Codec envelope __mqtData is not valid base64 (codec: ${envelope.__mqtCodec})`)
   }
-  const handler = resolveCodecHandler(envelope.__mqtCodec as MessageCodecRegistration)
+  const handler = resolveCodecHandler(codec ?? (envelope.__mqtCodec as MessageCodec))
   const compressed = Buffer.from(envelope.__mqtData, 'base64')
   const decompressed = await handler.decompress(compressed)
   return JSON.parse(decompressed.toString('utf8'))
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/codec/lib/codec/codecHandler.ts` around lines 89 - 97,
decompressMessageBody currently calls resolveCodecHandler(envelope.__mqtCodec as
MessageCodecRegistration) which only resolves built-in codecs and fails for
custom registrations; change the decompressMessageBody signature to accept an
optional codec registry/handler parameter (e.g. second param: codecRegistry or
resolve function) or accept a MessageCodecRegistration directly so callers can
pass custom { name, handler } entries, then use that registry/handler to resolve
the codec instead of the global resolveCodecHandler when provided (fall back to
resolveCodecHandler for built-ins); update any callers (including places that
mirror compressMessageBody) to pass the stack-specific registry used by the SQS
consumer so decompressMessageBody becomes symmetric with compressMessageBody.
packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts (1)

434-471: ⚡ Quick win

Exercise disableCodecAutoDetection with a real codec envelope.

This case currently publishes plain JSON (skipCompressionBelow: 99_999), so it can pass even if codec auto-detection disabling is broken. Consider forcing envelope publish (skipCompressionBelow: 0) and asserting the consumer does not decode/consume it.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts` around lines
434 - 471, The test currently sends plain JSON so disabling auto-detection isn't
exercised; change the SqsPermissionPublisher in this test to force an actual
codec envelope (set skipCompressionBelow: 0 and keep codec:
MessageCodecEnum.ZSTD) so the wire message is compressed/encoded, then verify
the SqsPermissionConsumer started with disableCodecAutoDetection: true does not
auto-decode it — e.g., assert via handlerSpy.waitForMessageWithId that the
consumed payload is the raw codec envelope (or does not match the original plain
JSON object) instead of the decoded PERMISSIONS_ADD_MESSAGE_TYPE; update the
publisher options and the assertion accordingly while keeping the same consumer
configuration (SqsPermissionConsumer.disableCodecAutoDetection).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/codec/lib/codec/codecHandler.ts`:
- Around line 85-87: The buildCodecEnvelope function embeds codecName directly
into a JSON string, which breaks if codecName contains quotes, backslashes, or
control chars; update buildCodecEnvelope to properly escape codecName (e.g., use
JSON.stringify(codecName) when composing the envelope) or enforce/validate
MessageCodecRegistration.name against a safe character set at registration so
only safe names are used; ensure the change targets the buildCodecEnvelope
function and any registration validation logic for
MessageCodecRegistration.name.

In `@packages/core/lib/queues/AbstractQueueService.ts`:
- Around line 800-823: The current publish/offload path in AbstractQueueService
uses blocking fs calls (fs.statSync, fs.readFileSync, fs.unlinkSync) which can
stall the event loop; replace these with their async equivalents (use
fs.promises.stat or fs.promises.lstat to get compressedSize,
fs.promises.readFile to obtain the compressedBuffer when envelopeSize is under
payloadStoreConfig.messageSizeThreshold, and fs.promises.unlink in the finally
block) and ensure the surrounding method (the function that computes
envelopeSize, calls resolveOutgoingStore(), and returns either pointer via
buildPointer(...) or compressedBuffer) properly awaits these promises and
handles errors without changing semantics (preserve envelopeSize calculation
using codecName and compressedSize, and keep the same branch that stores via
store.storePayload when envelopeSize exceeds messageSizeThreshold).

---

Outside diff comments:
In `@packages/core/lib/utils/streamUtils.ts`:
- Around line 7-20: The stream-to-buffer helper currently masks payload size
mismatches by truncating or slicing the allocation; modify the loop and return
logic to fail fast: inside the for-await loop (variables: stream, chunk,
chunkBuffer, offset, size, buffer) throw an Error if a chunk would overflow the
expected size (i.e., if offset + chunkBuffer.length > size) instead of copying,
and after the loop throw an Error if offset !== size (underflow) instead of
returning a shorter Buffer; include the expected size and actual bytes read in
the error messages to aid debugging.

---

Duplicate comments:
In `@packages/sqs/README.md`:
- Line 872: Update the incorrect SQS message size in the README sentence that
begins "Compressed payloads are still subject to the SQS 256 KB message size
limit." — change "256 KB" to "1 MiB" so the sentence reads that compressed
payloads are subject to the SQS 1 MiB message size limit; ensure the rest of the
sentence about using Payload Offloading and setting payloadRef.codec remains
unchanged.

---

Nitpick comments:
In `@packages/codec/lib/codec/codecHandler.ts`:
- Around line 89-97: decompressMessageBody currently calls
resolveCodecHandler(envelope.__mqtCodec as MessageCodecRegistration) which only
resolves built-in codecs and fails for custom registrations; change the
decompressMessageBody signature to accept an optional codec registry/handler
parameter (e.g. second param: codecRegistry or resolve function) or accept a
MessageCodecRegistration directly so callers can pass custom { name, handler }
entries, then use that registry/handler to resolve the codec instead of the
global resolveCodecHandler when provided (fall back to resolveCodecHandler for
built-ins); update any callers (including places that mirror
compressMessageBody) to pass the stack-specific registry used by the SQS
consumer so decompressMessageBody becomes symmetric with compressMessageBody.

In `@packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts`:
- Around line 434-471: The test currently sends plain JSON so disabling
auto-detection isn't exercised; change the SqsPermissionPublisher in this test
to force an actual codec envelope (set skipCompressionBelow: 0 and keep codec:
MessageCodecEnum.ZSTD) so the wire message is compressed/encoded, then verify
the SqsPermissionConsumer started with disableCodecAutoDetection: true does not
auto-decode it — e.g., assert via handlerSpy.waitForMessageWithId that the
consumed payload is the raw codec envelope (or does not match the original plain
JSON object) instead of the decoded PERMISSIONS_ADD_MESSAGE_TYPE; update the
publisher options and the assertion accordingly while keeping the same consumer
configuration (SqsPermissionConsumer.disableCodecAutoDetection).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9ebde88c-6086-4dcc-8df9-e2260925db79

📥 Commits

Reviewing files that changed from the base of the PR and between ff779b5 and ddf36be.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (31)
  • packages/amqp/lib/AbstractAmqpPublisher.ts
  • packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
  • packages/codec/README.md
  • packages/codec/lib/codec/codecHandler.ts
  • packages/codec/lib/index.ts
  • packages/codec/package.json
  • packages/core/README.md
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/index.ts
  • packages/core/lib/payload-store/offloadedPayloadMessageSchemas.ts
  • packages/core/lib/payload-store/payloadStoreTypes.ts
  • packages/core/lib/queues/AbstractQueueService.ts
  • packages/core/lib/types/queueOptionsTypes.ts
  • packages/core/lib/utils/streamUtils.ts
  • packages/core/test/codec/messageCodec.spec.ts
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts
  • packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts
  • packages/sns/README.md
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
  • packages/sqs/README.md
  • packages/sqs/bench/codec.bench.ts
  • packages/sqs/bench/codecMicro.bench.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/test/codec/codecHandler.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.codec.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.ts
  • packages/sqs/test/publishers/SqsPermissionPublisher.ts
  • packages/sqs/test/utils/s3Utils.ts
✅ Files skipped from review due to trivial changes (5)
  • packages/core/lib/payload-store/payloadStoreTypes.ts
  • packages/sqs/test/codec/codecHandler.spec.ts
  • packages/sns/README.md
  • packages/core/README.md
  • packages/codec/README.md

Comment thread packages/core/lib/codec/codecHandler.ts Outdated
Comment thread packages/core/lib/queues/AbstractQueueService.ts Outdated
Irfan Hodzic and others added 2 commits May 21, 2026 17:08
fs.statSync, fs.readFileSync, and fs.unlinkSync in compressAndOffloadPayload
stall the event loop; replace with fs.promises.stat, fs.promises.readFile,
and fs.promises.unlink respectively. Semantics are unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add SAFE_CODEC_NAME_RE guard in getCodecName() for object-form
registrations so names containing JSON-unsafe characters (quotes,
backslashes, whitespace, etc.) throw a clear error at startup instead
of producing malformed envelope JSON at message-send time.

Allowed charset: ASCII letters, digits, hyphens, underscores.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Owner

@kibertoad kibertoad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: zstd compression codec

Thorough, well-tested feature with a clean commit history. Inline comments below are prioritized [P1] high / [P2] medium / [P3] low. Two non-inline items:

  • PR description is stale. It still shows the { "__codec", "__data" } envelope (renamed to __mqtCodec/__mqtData), @mongodb-js/zstd (replaced by built-in zlib), packages/core/lib/codec/messageCodec.ts as the impl home (moved to packages/codec), and the sqs barrel re-export (later removed). Worth refreshing before merge.
  • No UPGRADING.md / changelog entry for the new @message-queue-toolkit/codec package or the new core exports.

The two most important items are the SNS consumer option leak and the stale consumer docs — both stem from the codeccodecs consumer-API refactor not being fully propagated.

| 'maxRetryDuration'
| 'payloadStoreConfig'
| 'concurrentConsumersAmount'
| 'codec'
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] SNS consumers silently re-expose codec/skipCompressionBelow, defeating the consumer-API redesign.

The SQS consumer deliberately does Omit<QueueConsumerOptions<…>, 'codec' | 'skipCompressionBelow'> and replaces them with a codecs array. But SNSSQSConsumerOptions = SQSConsumerOptions & SNSOptions & {…}, and SNSOptions = QueueOptions<…> re-includes the full CommonQueueOptions — so codec and skipCompressionBelow come back onto SNS consumer options. That's the only reason Pick<…, 'codec'> compiles here.

Concrete harm: passing a custom codec as codec: { name, handler } to an SNS consumer type-checks and sets this.codec, but the consumer decompresses via codecRegistry (built from options.codecs). The handler is never registered → custom-codec messages fail isCodecEnvelope scoping and silently route to the DLQ.

Also: this test and SnsSqsPermissionConsumer.codec.spec.ts pass codec: 'zstd' to the consumer — that only "works" because built-in zstd is auto-registered regardless; the option itself is dead.

Fix: apply the same Omit to the SNS consumer options (or strip them in AbstractSnsSqsConsumer), plumb codecs through this helper, and add an SNS custom-codec test.

Comment thread packages/core/README.md Outdated

const codec = { name: 'lz4', handler: new MyLz4Handler() }
new MyPublisher(deps, { codec })
new MyConsumer(deps, { codec }) // same registration required on the consumer
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Stale consumer docs. Consumer options were changed from codec to a codecs array. new MyConsumer(deps, { codec }) should be new MyConsumer(deps, { codecs: [codec] }). (Built-in zstd needs no consumer option at all — only custom codecs need codecs.)

Comment thread packages/codec/README.md Outdated
new MyPublisher(deps, { codec })

// Consumer — only auto-detects envelopes whose __mqtCodec matches 'lz4'
new MyConsumer(deps, { codec })
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Stale consumer docs. Consumer registration is the codecs array now: new MyConsumer(deps, { codecs: [codec] }), not { codec }.

Comment thread packages/sqs/README.md Outdated
super(deps, {
// Optional: explicitly declare that messages are compressed.
// Without this, consumers still auto-detect and decompress codec envelopes.
codec: MessageCodecEnum.ZSTD,
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Stale / non-compiling consumer example. SQS consumer options omit codec (replaced by codecs), so this snippet would not compile if copied. For built-in zstd a consumer needs no option — auto-detection handles it. The "explicitly declare that messages are compressed" comment describes an option that no longer exists. Use codecs: [...] only for custom codecs.

Comment thread packages/sqs/README.md Outdated
// Optional - Compression (Node.js >=22.15.0 required)
// Auto-detection is always active: consumers decompress codec envelopes
// even without this option set.
codec: MessageCodecEnum.ZSTD,
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Stale consumer config reference. codec is not a valid SQS consumer option (omitted in favor of codecs). Remove it here, or show codecs: [...] for the custom-codec case.

* // Always compress (disable the floor)
* new MyPublisher(deps, { codec: MessageCodecEnum.ZSTD, skipCompressionBelow: 0 })
*/
skipCompressionBelow?: number
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Incomplete docs: skipCompressionBelow is silently ignored when a payload store is configured. compressAndOffloadPayload intentionally skips this floor (documented only in an internal code comment). A user setting skipCompressionBelow: 99999 alongside payloadStoreConfig will be surprised that small messages are still compressed. State this caveat in the user-facing JSDoc here and in the READMEs.

try {
const envelope = resolveMessageResult.result.body
const handler = this.codecRegistry.get(envelope.__mqtCodec)
if (!handler) throw new Error(`Unknown codec: ${envelope.__mqtCodec}`)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Dead check. isCodecEnvelope(body, this.codecKnownNames) on the else if above already guarantees __mqtCodec ∈ codecKnownNames, and codecKnownNames is exactly codecRegistry.keys() — so codecRegistry.get(envelope.__mqtCodec) can't be undefined here. The if (!handler) throw is unreachable. Harmless, but removable.

) {
// `codec` lives on CommonQueueOptions (added after this package's core peer dep was frozen),
// so it is not present in the local type. The cast guards JS callers and future versions.
if ((options as { codec?: unknown }).codec) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Inconsistency. sqs, sns, and gcp-pubsub migrated their @message-queue-toolkit/core devDependency to workspace:*, but amqp did not — which is why this guard needs the as { codec?: unknown } cast while the PubSub guard uses a clean options.codec. Migrating amqp's core devDep too would let this cast go away and keep the packages consistent.

* check a malformed __mqtData field produces garbage bytes and a confusing codec error
* instead of a clear "invalid envelope" message.
*/
const BASE64_RE =
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Duplicated regex. This BASE64_RE is identical to the one in packages/core/lib/codec/messageCodec.ts. Export one and reuse it to avoid the two definitions drifting.

Comment thread packages/core/lib/codec/codecHandler.ts Outdated
return '{"__mqtCodec":"' + codecName + '","__mqtData":"' + compressed.toString('base64') + '"}'
}

export async function decompressMessageBody(envelope: CodecEnvelope): Promise<unknown> {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] decompressMessageBody can't handle custom codecs. resolveCodecHandler throws Unsupported codec for any non-zstd string, so this exported util only works for built-in codecs. The consumer path is unaffected (it uses its registry), but a user calling this public util on a custom-codec envelope gets a confusing error. Either document the built-in-only limitation or accept an optional handler/registry.

P1 — SNS consumer re-exposed codec/skipCompressionBelow via SNSOptions
intersection; strip them with Omit<SNSOptions, …> (not at the top level,
to preserve the SQSConsumerOptions fifoQueue discriminated union).
Update SnsSqsPermissionConsumer fixture and codec spec to use codecs[]
instead of the now-removed codec option.

P1 — Stale README examples updated: consumer codec option → codecs array,
compressed-size threshold description → codec envelope wire size.

P2 — Fix base64 size estimate: Math.ceil(N/3)*4 (exact) instead of
Math.ceil(N*4/3) (underestimates by up to 2 bytes near protocol limit).

P2 — Move dedup check before prepareOutgoingPayload in both SQS and SNS
publishers so duplicates skip compression and S3 upload entirely.

P2 — Bump @message-queue-toolkit/core peer-dep floor to >=25.5.0 in
codec, sqs, and sns packages (codec types were added in this version).

P3 — Export BASE64_RE from core and import it in the codec package to
eliminate the duplicate regex definition.

P3 — Migrate AMQP devDep to workspace:* so options.codec resolves via
the local TypeScript types and the (options as {codec?:unknown}) cast
can be removed.

P3 — Remove dead !handler guard in AbstractSqsConsumer inline-envelope
path (isCodecEnvelope already guarantees the key is in codecRegistry).

P3 — Add JSDoc to decompressMessageBody noting it only handles built-in
codecs; custom-codec decoding goes through the consumer's registry.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
packages/sqs/README.md (1)

865-869: ⚠️ Potential issue | 🟡 Minor | 💤 Low value

Inconsistent SQS message size limit reference.

Line 867 still references "SQS 256 KB message size limit" while the rest of the README consistently uses 1 MiB (e.g., lines 77, 386, 463, 774, 788). Per the past review discussion confirming AWS increased the limit to 1 MiB in August 2025, this should be updated for consistency.

Suggested doc fix
-- Compressed payloads are still subject to the SQS 256 KB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
+- Compressed payloads are still subject to the SQS 1 MiB message size limit. For messages that remain oversized after compression, combine with [Payload Offloading](`#payload-offloading`).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/sqs/README.md` around lines 865 - 869, Update the inconsistent size
reference in the README: replace the phrase "SQS 256 KB message size limit" in
the compression docs (the bullet starting "Compressed payloads are still subject
to the SQS 256 KB message size limit") with "SQS 1 MiB message size limit" so it
matches the other mentions of the AWS SQS limit; ensure the surrounding sentence
still reads correctly and keep the guidance about combining with Payload
Offloading and recording payloadRef.codec unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@packages/sqs/README.md`:
- Around line 865-869: Update the inconsistent size reference in the README:
replace the phrase "SQS 256 KB message size limit" in the compression docs (the
bullet starting "Compressed payloads are still subject to the SQS 256 KB message
size limit") with "SQS 1 MiB message size limit" so it matches the other
mentions of the AWS SQS limit; ensure the surrounding sentence still reads
correctly and keep the guidance about combining with Payload Offloading and
recording payloadRef.codec unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 271d9dff-8b7a-4755-a975-db3cca3a7b64

📥 Commits

Reviewing files that changed from the base of the PR and between ddf36be and abe0c61.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (19)
  • packages/amqp/lib/AbstractAmqpPublisher.ts
  • packages/amqp/package.json
  • packages/codec/README.md
  • packages/codec/lib/codec/codecHandler.ts
  • packages/codec/package.json
  • packages/core/README.md
  • packages/core/lib/codec/messageCodec.ts
  • packages/core/lib/index.ts
  • packages/core/lib/queues/AbstractQueueService.ts
  • packages/sns/lib/sns/AbstractSnsPublisher.ts
  • packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
  • packages/sns/package.json
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.codec.spec.ts
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
  • packages/sqs/README.md
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/lib/sqs/AbstractSqsPublisher.ts
  • packages/sqs/package.json
  • packages/sqs/test/codec/codecHandler.spec.ts
✅ Files skipped from review due to trivial changes (1)
  • packages/codec/README.md

Irfan Hodzic and others added 4 commits May 22, 2026 12:20
…d codec/offload improvements

- Move duplicate prepareOutgoingPayload from AbstractSqsPublisher and AbstractSnsPublisher
  to AbstractQueueService; publishers pre-resolve codec handler/name in constructors to
  avoid circular dependency between core and codec package
- Add buildInlineCodecEnvelope and calculateOutgoingMessageSize to AbstractQueueService;
  publisher subclasses override calculateOutgoingMessageSize with their transport utility
- Add in-memory fast path in compressAndOffloadPayload: small string payloads are
  compressed directly into a Buffer, skipping the temp-file pipeline entirely
- Fix tmpPath construction to use path.join instead of string concatenation
- Add JSDoc caveat to skipCompressionBelow noting it is ignored when payloadStoreConfig is set
- Add test: consumer with disableCodecAutoDetection:true does not decompress real zstd envelope

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…reshold bug

Addresses code-review findings on the zstd message compression feature.

P0 — In-memory fast path in compressAndOffloadPayload could emit an inline
codec envelope larger than messageSizeThreshold (and the 256 KB protocol
limit): it gated on raw size and assumed "compression only shrinks", which is
false for incompressible data. It now compares the base64 envelope wire size
and offloads when it exceeds the threshold, matching the streaming path.

P0 — The codec implementation was effectively a mandatory peer dependency of
sqs/sns (static import) despite being documented as opt-in, breaking every
existing consumer on upgrade. The codec uses only the Node.js built-in zlib
(no native deps), so the standalone @message-queue-toolkit/codec package is
merged into @message-queue-toolkit/core and removed. Nothing extra to install.

P1 — buildCodecEnvelope is no longer duplicated; AbstractQueueService uses the
shared core implementation.

P1 — codec/skipCompressionBelow moved off the shared CommonQueueOptions onto
QueuePublisherOptions; codecs/disableCodecAutoDetection onto the SQS consumer
options. AMQP/Pub-Sub consumers no longer silently accept a codec option.

P1 — Documented the publisher dedup-before-offload ordering trade-off.

P2 — skipCompressionBelow is now honored even when payloadStoreConfig is set:
small messages are offloaded/sent as plain JSON instead of always compressed.

P2 — Removed the misleading "compression only shrinks" comment; corrected the
codecMicro benchmark header (it is a manual benchmark, not a CI gate).

P2 — prepareOutgoingPayload serializes the message once instead of twice.

P3 — An unregistered codec on an offloaded pointer is now a retriable error
(consumer misconfiguration) instead of being routed to the DLQ as poison.

P3 — Documented the os.tmpdir() requirement of the streaming offload path and
widened the codec test padding margin.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…helpers, docs

P1 — An inline message whose `__mqtCodec` named an unregistered codec was
aborted (poison treatment) and silently dropped, while the offloaded-payload
path treated the same misconfiguration as retriable. The inline path now
throws, so the message stays on the queue until the codec is registered —
consistent with retrieveOffloadedMessagePayload.

P2 — Documented the deliberate buffer-based decompression (no
createDecompressStream: the consumer must JSON.parse the whole payload anyway)
on MessageCodecHandler and in the SQS README. Corrected the stale
`disableCodecAutoDetection` JSDoc (detection is presence-based via
hasCodecEnvelopeShape, not an exact two-field match) and warned that, with it
enabled, a compressed message can pass a lenient schema as an incomplete one.

P3 — Removed compressMessageBody / decompressMessageBody: test-only helpers
that leaked into the public API, duplicated the exported buildCodecEnvelope /
resolveCodecHandler primitives, and were footguns (no preserved routing
fields; built-in codecs only). Tests now build envelopes from the primitives.
Clarified the 256 KB transport-limit wording, made estimateCodecEnvelopeSize
measure UTF-8 bytes consistently, and removed a stray packages/sqs/NUL file.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A message envelope (inline body or offloaded-payload pointer) naming a
codec the consumer has not registered is a deployment misconfiguration
that will not be fixed within the SQS redelivery window. Retrying it just
burns receive-count attempts before the message lands in the DLQ anyway —
or, with no DLQ, spams the error reporter every visibility cycle until the
message expires.

deserializeMessage's missing-codec throw is now caught in handleMessage
and funneled through the existing abort path: the error is logged/reported
once and the message is routed to the DLQ (if configured) so it can be
redriven after the codec is deployed. Inline and offloaded paths stay
consistent — both terminal.

This also fixes the CI failure in SqsPermissionConsumer.codec.spec.ts,
whose assertion that an error is recorded could not hold while the throw
bypassed handleMessageProcessed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kibertoad kibertoad merged commit 2702e74 into kibertoad:main May 22, 2026
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants