Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0794af6
feat: add zstd message compression codec for SNS and SQS
May 18, 2026
b3b0fcf
refactor: move zstd implementation out of core, harden codec guardrai…
May 18, 2026
9950db2
ci: rebuild @mongodb-js/zstd native binary after --ignore-scripts ins…
May 19, 2026
2dfdb6e
feat(sqs): add codec benchmarks for publish and consume throughput
May 19, 2026
c1d83e7
feat(sqs/core): replace @mongodb-js/zstd with Node.js built-in zlib a…
May 19, 2026
12897ca
feat: extract codec into standalone @message-queue-toolkit/codec package
May 19, 2026
094dc5d
fix(codec): tighten Node.js version requirement to >=22.15.0
May 19, 2026
69bf6cc
Merge branch 'main' into main
irfanh94 May 19, 2026
ff779b5
refactor: compress-once in publish, split offload into focused helpers
May 19, 2026
989165f
test: verify wire format of compressed messages in integration tests
May 19, 2026
17252f9
feat: add skipCompressionBelow option with default of 512 bytes
May 21, 2026
4e03713
perf: stream codec+offload path to avoid 3× buffer materialisation
May 21, 2026
98acde2
fix(codec): rename envelope fields, fix wire-size threshold, skip int…
May 21, 2026
8c4a538
feat(codec): extensible codec system with consumer registry and custo…
May 21, 2026
89d37ae
fix(codec): validate __mqtData is well-formed base64 before decoding …
May 21, 2026
ddf36be
fix(codec): address code-review comments on codec implementation
May 21, 2026
a938f91
perf(core): replace blocking fs sync calls with async equivalents
May 21, 2026
1f34766
feat(codec): validate custom codec names at registration time
May 21, 2026
abe0c61
fix(codec): address second round of code-review comments
May 22, 2026
d265365
refactor: hoist prepareOutgoingPayload to AbstractQueueService and ad…
May 22, 2026
18ae982
fix(codec): address PR review — merge codec into core, fix offload th…
kibertoad May 22, 2026
084666a
Extra refinement
kibertoad May 22, 2026
b3ae7c4
fix(codec): address review — retriable missing-codec, drop redundant …
kibertoad May 22, 2026
29c5e36
fix(codec): treat unregistered-codec messages as invalid, not retriable
kibertoad May 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ensure-labels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ jobs:
- name: Check one of required labels are set
uses: docker://agilepathway/pull-request-label-checker:v1.6.65
with:
one_of: major,minor,patch,skip-release
one_of: major,minor,patch,skip-release,release-same-version
repo_token: ${{ secrets.GITHUB_TOKEN }}
30 changes: 26 additions & 4 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
has_changes: ${{ steps.finalize.outputs.has_changes }}
bump: ${{ steps.finalize.outputs.bump }}
should_publish: ${{ steps.finalize.outputs.should_publish }}
same_version: ${{ steps.finalize.outputs.same_version }}
steps:
- name: Set default outputs
id: defaults
Expand All @@ -31,6 +32,7 @@ jobs:
echo "has_changes=false" >> $GITHUB_OUTPUT
echo "bump=patch" >> $GITHUB_OUTPUT
echo "should_publish=false" >> $GITHUB_OUTPUT
echo "same_version=false" >> $GITHUB_OUTPUT

- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
Expand Down Expand Up @@ -79,11 +81,11 @@ jobs:
echo "PR labels: $LABELS"
echo "labels=$LABELS" >> $GITHUB_OUTPUT

# Check if PR has version bump labels
if echo "$LABELS" | grep -qE '\b(patch|minor|major)\b'; then
# Check if PR has a release label (a version-bump label or release-same-version)
if echo "$LABELS" | grep -qE '\b(patch|minor|major|release-same-version)\b'; then
echo "should_publish=true" >> $GITHUB_OUTPUT
else
echo "No version bump label found (patch/minor/major)"
echo "No release label found (patch/minor/major/release-same-version)"
echo "should_publish=false" >> $GITHUB_OUTPUT
fi

Expand All @@ -93,13 +95,23 @@ jobs:
env:
LABELS: ${{ steps.pr-info.outputs.labels }}
run: |
if echo "$LABELS" | grep -qE '\bmajor\b'; then
# release-same-version takes precedence: publish the versions already in
# package.json as-is, with no bump. Use it when versions were set explicitly
# in the PR (e.g. a deliberate, hand-committed semver-major bump).
if echo "$LABELS" | grep -qE '\brelease-same-version\b'; then
echo "same_version=true" >> $GITHUB_OUTPUT
echo "bump=none" >> $GITHUB_OUTPUT
echo "Release mode: publish current package.json versions without bumping"
elif echo "$LABELS" | grep -qE '\bmajor\b'; then
echo "same_version=false" >> $GITHUB_OUTPUT
echo "bump=major" >> $GITHUB_OUTPUT
echo "Version bump: major"
elif echo "$LABELS" | grep -qE '\bminor\b'; then
echo "same_version=false" >> $GITHUB_OUTPUT
echo "bump=minor" >> $GITHUB_OUTPUT
echo "Version bump: minor"
else
echo "same_version=false" >> $GITHUB_OUTPUT
echo "bump=patch" >> $GITHUB_OUTPUT
echo "Version bump: patch"
fi
Expand Down Expand Up @@ -240,6 +252,8 @@ jobs:
DEFAULT_BUMP: ${{ steps.defaults.outputs.bump }}
PR_SHOULD_PUBLISH: ${{ steps.pr-info.outputs.should_publish }}
DEFAULT_SHOULD_PUBLISH: ${{ steps.defaults.outputs.should_publish }}
VERSION_SAME: ${{ steps.version.outputs.same_version }}
DEFAULT_SAME_VERSION: ${{ steps.defaults.outputs.same_version }}
run: |
# Use build-matrix outputs if available, otherwise defaults
if [ -n "$BUILD_MATRIX" ]; then
Expand All @@ -266,6 +280,12 @@ jobs:
echo "should_publish=$DEFAULT_SHOULD_PUBLISH" >> $GITHUB_OUTPUT
fi

if [ -n "$VERSION_SAME" ]; then
echo "same_version=$VERSION_SAME" >> $GITHUB_OUTPUT
else
echo "same_version=$DEFAULT_SAME_VERSION" >> $GITHUB_OUTPUT
fi

# Single job that bumps, publishes, and pushes tags/commits at the end
# This avoids the "checkout wrong commit" problem of multi-job workflows
release:
Expand Down Expand Up @@ -305,8 +325,10 @@ jobs:
- name: Install dependencies
run: pnpm install --frozen-lockfile --ignore-scripts

# Skipped for release-same-version: the versions in package.json are published as-is.
- name: Bump versions for changed packages
id: bump
if: needs.detect-changes.outputs.same_version != 'true'
env:
MATRIX: ${{ needs.detect-changes.outputs.matrix }}
BUMP: ${{ needs.detect-changes.outputs.bump }}
Expand Down
14 changes: 13 additions & 1 deletion biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,17 @@
"noUnusedPrivateClassMembers": "off"
}
}
}
},
"overrides": [
{
"includes": ["**/bench/**"],
"linter": {
"rules": {
"suspicious": {
"noConsole": "off"
}
}
}
}
]
}
6 changes: 6 additions & 0 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ export abstract class AbstractAmqpPublisher<
dependencies: AMQPDependencies,
options: AMQPPublisherOptions<MessagePayloadType, CreationConfig, LocatorConfig>,
) {
if (options.codec) {
throw new Error(
'codec is not supported by AbstractAmqpPublisher. Remove the codec option or use an SQS/SNS publisher.',
)
}

super(dependencies, options)

this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/amqp",
"version": "24.0.0",
"version": "24.1.0",
"engines": {
"node": ">=18"
},
Expand Down Expand Up @@ -43,7 +43,7 @@
"@biomejs/biome": "^2.3.8",
"@lokalise/biome-config": "^3.1.0",
"@lokalise/tsconfig": "^3.0.0",
"@message-queue-toolkit/core": "*",
"@message-queue-toolkit/core": "workspace:*",
"@types/amqplib": "0.10.8",
"@types/node": "^25.5.0",
"@vitest/coverage-v8": "^4.0.18",
Expand Down
12 changes: 12 additions & 0 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { asClass, asFunction, Lifetime } from 'awilix'
import { asMockFunction } from 'awilix-manager'
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest'
import { ZodError } from 'zod/v4'
import { AbstractAmqpQueuePublisher } from '../../lib/AbstractAmqpQueuePublisher.ts'
import { deserializeAmqpMessage } from '../../lib/amqpMessageDeserializer.ts'
import { AmqpPermissionConsumer } from '../consumers/AmqpPermissionConsumer.ts'
import type {
Expand All @@ -25,6 +26,17 @@ import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext.ts'
import { AmqpPermissionPublisher } from './AmqpPermissionPublisher.ts'

describe('PermissionPublisher', () => {
describe('constructor', () => {
it('throws when codec option is set (codec is not supported by AMQP publishers)', () => {
// AmqpPermissionPublisher strips unknown options before calling super(), so we test
// the guard via a minimal pass-through subclass that mirrors real user code.
class TestPublisher extends AbstractAmqpQueuePublisher<{ messageType: string }> {}
expect(() => new TestPublisher({} as any, { codec: 'zstd' } as any)).toThrow(
'codec is not supported by AbstractAmqpPublisher',
)
})
})

describe('logging', () => {
let logger: FakeLogger
let diContainer: AwilixContainer<Dependencies>
Expand Down
52 changes: 52 additions & 0 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,58 @@ class MyPayloadStore implements PayloadStore {
}
```

#### Message compression (codec)

Publishers can compress outgoing messages by setting `codec` in their options. The codec implementation (zstd via the Node.js built-in `zlib` module) ships inside `@message-queue-toolkit/core` — there is no extra package to install. Requires **Node.js >=22.15.0**. Only the SQS and SNS adapters support compression.

**Built-in zstd:**

```typescript
import { MessageCodecEnum } from '@message-queue-toolkit/core'

new MyPublisher(deps, {
codec: MessageCodecEnum.ZSTD,
// Optional: skip compression for messages below this byte threshold (default: 512).
// Small messages often expand when compressed; set to 0 to always compress.
skipCompressionBelow: 512,
})
```

**Custom codec** (bring your own compression library):

```typescript
import type { MessageCodecHandler } from '@message-queue-toolkit/core'

class MyLz4Handler implements MessageCodecHandler {
async compress(data: Buffer): Promise<Buffer> { /* ... */ }
async decompress(data: Buffer): Promise<Buffer> { /* ... */ }
createCompressStream(): Transform { /* return a Transform stream */ }
}

const codec = { name: 'lz4', handler: new MyLz4Handler() }
new MyPublisher(deps, { codec })
new MyConsumer(deps, { codecs: [codec] }) // register custom codec on the consumer
```

Compressed messages are wrapped in a self-describing envelope `{ __mqtCodec: '<name>', __mqtData: '<base64>', ...preserved fields }`. The message's identity/routing fields (`id`, `timestamp`, `type`, deduplication fields) are copied alongside `__mqtData` as plaintext — the same fields an offloaded-payload pointer preserves — so broker-side filtering (e.g. SNS body-scoped `FilterPolicy`) keeps working on them. Built-in codecs (e.g. zstd) are auto-detected on every consumer — no consumer option needed. For custom codecs, pass `codecs: [{ name, handler }]` to register them on the consumer.

> **Roll out consumers before publishers:** auto-detection only works on a consumer running a library version that supports the codec (and, for custom codecs, with that codec registered). Upgrade all consumers of a queue first, then enable `codec` on publishers.

#### Interaction with codec (compression)

When both `codec` and `payloadStoreConfig` are set on a publisher, compression and offloading work together with a single compression pass:

1. The message is compressed **once** at publish time.
2. The **codec envelope wire size** (base64-encoded compressed bytes + JSON framing) is compared against `messageSizeThreshold`.
3. If the envelope size exceeds the threshold, the raw compressed bytes are stored in the payload store. The codec name is written to `payloadRef.codec` so the consumer knows how to decompress after retrieval.
4. If the envelope size fits within the threshold, the message is sent inline as a self-describing codec envelope — S3 is never touched.

`skipCompressionBelow` is honored here too: a message whose serialized JSON is below the threshold skips compression entirely and is offloaded (or sent inline) as plain JSON.

> **Note:** for large payloads the compress-and-offload path streams the message through a temporary file under `os.tmpdir()` to avoid buffering the whole payload in memory. The temp file is always removed in a `finally` block. Environments with a read-only or unavailable temp directory (rare; AWS Lambda's `/tmp` is writable) cannot use the codec + payload-store combination.

This means compression can prevent offloading entirely for messages that are large before compression but small after.

## API Reference

### Types
Expand Down
129 changes: 129 additions & 0 deletions packages/core/lib/codec/codecHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import type { Transform } from 'node:stream'
import { promisify } from 'node:util'
import zlib from 'node:zlib'
import type { MessageCodecHandler, MessageCodecRegistration } from './messageCodec.ts'
import { MessageCodecEnum } from './messageCodec.ts'

const ZSTD_UNSUPPORTED_MSG =
'zlib.zstdCompress and zlib.zstdDecompress are not available in this Node.js version. ' +
'Message compression requires Node.js >=22.15.0 or >=23.8.0.'

/**
* Default upper bound on the decompressed size of a single message, in bytes (100 MiB).
*
* Protects consumers from decompression-bomb inputs: a tiny compressed envelope can
* otherwise expand to gigabytes of highly-repetitive data and exhaust process memory.
* 100 MiB is far above any realistic queue message (SQS/SNS cap bodies at 256 KiB, and
* even offloaded payloads are typically single-digit MiB) while still bounding the blast
* radius of a malicious or corrupt frame. Override via the {@link ZstdCodecHandler}
* constructor if you legitimately handle larger messages.
*/
export const DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024

// Resolved lazily — undefined on Node versions that lack zstd support.
// Keeping these lazy means importing core never throws on older Node; only an
// actual compress/decompress call does, and only when zstd is genuinely used.
const zstdCompress =
typeof zlib.zstdCompress === 'function' ? promisify(zlib.zstdCompress) : undefined
const zstdDecompress =
typeof zlib.zstdDecompress === 'function' ? promisify(zlib.zstdDecompress) : undefined

export class ZstdCodecHandler implements MessageCodecHandler {
Comment thread
irfanh94 marked this conversation as resolved.
Copy link
Copy Markdown
Collaborator

@CarlosGamero CarlosGamero May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we’re no longer relying on external libraries for compression, we might consider moving this utility into the core. That would allow us to reuse it across other modules like amp or gcp-pubsup without having to reimplement it each time.

@kibertoad tagging you to get your thoughts on this, especially since you previously suggested moving it out of core 😓

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just saw the comment about a separate package. I agree with that as well—ultimately, we’re both talking about the same thing: enabling compression support across the different technologies we support.

One open question I’m still unsure about: does it really make sense to create a separate package if we’re going to rely on Node internals anyway? If we keep it in core, customers don’t need to install additional dependencies they may not use. From that perspective, having it in core feels sufficient, but I don’t have a strong opinion either way.

private readonly maxDecompressedBytes: number

/**
* @param maxDecompressedBytes upper bound on a single decompressed message, in bytes.
* Defaults to {@link DEFAULT_MAX_DECOMPRESSED_BYTES} (100 MiB). Decompression of an
* input that would exceed this limit is rejected before the full payload is buffered.
*/
constructor(maxDecompressedBytes: number = DEFAULT_MAX_DECOMPRESSED_BYTES) {
this.maxDecompressedBytes = maxDecompressedBytes
}

compress(data: Buffer): Promise<Buffer> {
if (!zstdCompress) throw new Error(ZSTD_UNSUPPORTED_MSG)
return zstdCompress(data)
}

decompress(data: Buffer): Promise<Buffer> {
if (!zstdDecompress) throw new Error(ZSTD_UNSUPPORTED_MSG)
// maxOutputLength caps the decompressed size: zstdDecompress rejects with a
// RangeError once the limit is exceeded, guarding against decompression bombs.
return zstdDecompress(data, { maxOutputLength: this.maxDecompressedBytes })
}

createCompressStream(): Transform {
if (typeof zlib.createZstdCompress !== 'function') throw new Error(ZSTD_UNSUPPORTED_MSG)
return zlib.createZstdCompress()
}
}

const ZSTD_HANDLER = new ZstdCodecHandler()

/**
* Allowed characters for a custom codec name: ASCII letters, digits, hyphens, underscores.
* This keeps the name JSON-safe without escaping and makes it a recognisable identifier.
*/
const SAFE_CODEC_NAME_RE = /^[A-Za-z0-9_-]+$/

/**
* Returns the name string that will be written into the `__mqtCodec` field of every envelope.
* Throws for custom (object-form) registrations whose name contains characters that would
* produce invalid JSON when interpolated raw into the envelope string.
*/
export function getCodecName(codec: MessageCodecRegistration): string {
if (typeof codec === 'object') {
if (!SAFE_CODEC_NAME_RE.test(codec.name)) {
throw new Error(
`Invalid codec name "${codec.name}": only ASCII letters, digits, hyphens, and underscores are allowed`,
)
}
return codec.name
}
return codec
}

/**
* Resolves the {@link MessageCodecHandler} for the given codec registration.
*
* - String form (`MessageCodec`): returns the built-in handler for that codec.
* - Object form (`{ name, handler }`): returns the provided handler directly.
*/
export function resolveCodecHandler(codec: MessageCodecRegistration): MessageCodecHandler {
if (typeof codec === 'object') return codec.handler
if (codec === MessageCodecEnum.ZSTD) return ZSTD_HANDLER
throw new Error(`Unsupported codec: ${codec}`)
}

/**
* Wraps an already-compressed buffer in a codec envelope string.
* Use this when you have pre-compressed bytes and want to avoid compressing twice.
*
* `preservedFields`, when provided, are emitted as plaintext siblings of the codec
* fields (`{ ...preserved, __mqtCodec, __mqtData }`). Publishers use this to keep
* identity/routing fields (`id`, `type`, …) visible on the wire so broker-side
* filtering (e.g. SNS body-scoped FilterPolicy) still works on compressed messages —
* the same fields an offloaded-payload pointer carries. The codec fields are written
* last, so a colliding preserved key can never corrupt the envelope; consumers ignore
* the preserved siblings and decode `__mqtData` only.
*
* Without `preservedFields` the fast path uses string concatenation instead of
* JSON.stringify, avoiding an intermediate object — the base64 string and the
* envelope string are the only two allocations on the inline path.
*
* `codecName` must already be a JSON-safe identifier (see {@link getCodecName},
* which is enforced for every registration before it reaches this function).
*/
export function buildCodecEnvelope(
compressed: Buffer,
codecName: string,
preservedFields?: Record<string, unknown>,
): string {
const data = compressed.toString('base64')
if (!preservedFields || Object.keys(preservedFields).length === 0) {
return `{"__mqtCodec":"${codecName}","__mqtData":"${data}"}`
}
// Preserved fields present: a single JSON.stringify handles all value escaping.
// Codec fields are listed last so they always win over any colliding preserved key.
return JSON.stringify({ ...preservedFields, __mqtCodec: codecName, __mqtData: data })
}
Loading
Loading