Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,22 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
expect(result.processingResult).toEqual({ status: 'consumed' })
```

### Counters

For load-testing scenarios where retaining all messages in the buffer may be too memory-intensive, handler spies provide lightweight counters that track how many messages were processed with each status:

```ts
const counts = myConsumer.handlerSpy.counts
// {
// consumed: 150,
// published: 0,
// retryLater: 3,
// error: 1,
// }
```

Counters are incremented for every processed message regardless of `bufferSize`, so you get accurate statistics even when older messages have been evicted from the buffer. Calling `clear()` resets both the buffer and the counters.

## Message Logging

When `logMessages` is enabled, processed messages are logged at the `debug` level with structured metadata. For privacy reasons, the full message payload is **not logged by default** to avoid exposing sensitive data.
Expand Down
1 change: 1 addition & 0 deletions packages/core/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export {
type HandlerSpyParams,
type PublicHandlerSpy,
resolveHandlerSpy,
type SpyResultCounts,
type SpyResultInput,
TYPE_NOT_RESOLVED,
} from './queues/HandlerSpy.ts'
Expand Down
16 changes: 16 additions & 0 deletions packages/core/lib/queues/HandlerSpy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ export type HandlerSpyParams = {
messageIdField?: string
}

export type SpyResultCounts = {
consumed: number
published: number
retryLater: number
error: number
}

export type SpyResultInput<MessagePayloadSchemas extends object> = {
message: MessagePayloadSchemas | null
processingResult: MessageProcessingResult
Expand Down Expand Up @@ -87,12 +94,14 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
private readonly messageBuffer: Fifo<SpyResultInput<any>>
private readonly messageIdField: keyof MessagePayloadSchemas
private readonly spyPromises: SpyPromiseMetadata<MessagePayloadSchemas>[]
private _counts: SpyResultCounts

constructor(params: HandlerSpyParams = {}) {
this.messageBuffer = new Fifo(params.bufferSize ?? 100)
// @ts-expect-error
this.messageIdField = params.messageIdField ?? 'id'
this.spyPromises = []
this._counts = { consumed: 0, published: 0, retryLater: 0, error: 0 }
}

private messageMatchesFilter<T extends object>(
Expand Down Expand Up @@ -159,8 +168,13 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
return spyPromise
}

get counts(): SpyResultCounts {
return { ...this._counts }
}

clear() {
this.messageBuffer.clear()
this._counts = { consumed: 0, published: 0, retryLater: 0, error: 0 }
}

getAllReceivedMessages(): SpyResultOutput<MessagePayloadSchemas>[] {
Expand Down Expand Up @@ -199,6 +213,8 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
},
} as SpyResultOutput<MessagePayloadSchemas>)

this._counts[processingResult.processingResult.status]++

const cacheId = `${resolvedMessageId}-${Date.now()}-${(Math.random() + 1)
.toString(36)
.substring(7)}`
Expand Down
133 changes: 133 additions & 0 deletions packages/core/test/queues/HandlerSpy.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,139 @@ describe('HandlerSpy', () => {
})
})

describe('counts', () => {
it('Starts with all zeroes', () => {
const spy = new HandlerSpy<Message>()

expect(spy.counts).toEqual({
consumed: 0,
published: 0,
retryLater: 0,
error: 0,
})
})

it('Increments consumed counter', () => {
const spy = new HandlerSpy<Message>()

spy.addProcessedMessage(
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
undefined,
'test.type',
)

expect(spy.counts.consumed).toBe(1)
expect(spy.counts.error).toBe(0)
})

it('Increments error counter', () => {
const spy = new HandlerSpy<Message>()

spy.addProcessedMessage(
{ processingResult: { status: 'error', errorReason: 'invalidMessage' }, message: null },
'abc',
TYPE_NOT_RESOLVED,
)

expect(spy.counts.error).toBe(1)
expect(spy.counts.consumed).toBe(0)
})

it('Increments retryLater counter', () => {
const spy = new HandlerSpy<Message>()

spy.addProcessedMessage(
{ processingResult: { status: 'retryLater' }, message: TEST_MESSAGE },
undefined,
'test.type',
)

expect(spy.counts.retryLater).toBe(1)
})

it('Increments published counter', () => {
const spy = new HandlerSpy<Message>()

spy.addProcessedMessage(
{ processingResult: { status: 'published' }, message: TEST_MESSAGE },
undefined,
'test.type',
)

expect(spy.counts.published).toBe(1)
})

it('Tracks multiple statuses correctly', () => {
const spy = new HandlerSpy<Message>()

spy.addProcessedMessage(
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
undefined,
'test.type',
)
spy.addProcessedMessage(
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE_2 },
undefined,
'test.type',
)
spy.addProcessedMessage(
{ processingResult: { status: 'error', errorReason: 'handlerError' }, message: null },
'err1',
TYPE_NOT_RESOLVED,
)
spy.addProcessedMessage(
{ processingResult: { status: 'retryLater' }, message: TEST_MESSAGE },
undefined,
'test.type',
)

expect(spy.counts).toEqual({
consumed: 2,
published: 0,
retryLater: 1,
error: 1,
})
})

it('Resets counts on clear', () => {
const spy = new HandlerSpy<Message>()

spy.addProcessedMessage(
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
undefined,
'test.type',
)
spy.addProcessedMessage(
{ processingResult: { status: 'error', errorReason: 'invalidMessage' }, message: null },
'abc',
TYPE_NOT_RESOLVED,
)

spy.clear()

expect(spy.counts).toEqual({
consumed: 0,
published: 0,
retryLater: 0,
error: 0,
})
})

it('Returns a copy, not a reference', () => {
const spy = new HandlerSpy<Message>()

const countsBefore = spy.counts
spy.addProcessedMessage(
{ processingResult: { status: 'consumed' }, message: TEST_MESSAGE },
undefined,
'test.type',
)

expect(countsBefore.consumed).toBe(0)
expect(spy.counts.consumed).toBe(1)
})
})

describe('isHandlerSpy', () => {
it('HandlerSpy returns true', () => {
const spy = new HandlerSpy<Message>()
Expand Down
Loading