diff --git a/README.md b/README.md index 3d611273..cbe0af97 100644 --- a/README.md +++ b/README.md @@ -651,6 +651,22 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1') expect(result.processingResult).toEqual({ status: 'consumed' }) ``` +### Counters + +For load-testing scenarios where retaining all messages in the buffer may be too memory-intensive, handler spies provide lightweight counters that track how many messages were processed with each status: + +```ts +const counts = myConsumer.handlerSpy.counts +// { +// consumed: 150, +// published: 0, +// retryLater: 3, +// error: 1, +// } +``` + +Counters are incremented for every processed message regardless of `bufferSize`, so you get accurate statistics even when older messages have been evicted from the buffer. Calling `clear()` resets both the buffer and the counters. + ## Message Logging When `logMessages` is enabled, processed messages are logged at the `debug` level with structured metadata. For privacy reasons, the full message payload is **not logged by default** to avoid exposing sensitive data. diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index b9d29a8d..bb97ea60 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -69,6 +69,7 @@ export { type HandlerSpyParams, type PublicHandlerSpy, resolveHandlerSpy, + type SpyResultCounts, type SpyResultInput, TYPE_NOT_RESOLVED, } from './queues/HandlerSpy.ts' diff --git a/packages/core/lib/queues/HandlerSpy.ts b/packages/core/lib/queues/HandlerSpy.ts index 2573bb18..70215a0b 100644 --- a/packages/core/lib/queues/HandlerSpy.ts +++ b/packages/core/lib/queues/HandlerSpy.ts @@ -35,6 +35,13 @@ export type HandlerSpyParams = { messageIdField?: string } +export type SpyResultCounts = { + consumed: number + published: number + retryLater: number + error: number +} + export type SpyResultInput = { message: MessagePayloadSchemas | null processingResult: MessageProcessingResult @@ -87,12 +94,14 @@ export class HandlerSpy { private readonly messageBuffer: Fifo> private readonly messageIdField: keyof MessagePayloadSchemas private readonly spyPromises: SpyPromiseMetadata[] + private _counts: SpyResultCounts constructor(params: HandlerSpyParams = {}) { this.messageBuffer = new Fifo(params.bufferSize ?? 100) // @ts-expect-error this.messageIdField = params.messageIdField ?? 'id' this.spyPromises = [] + this._counts = { consumed: 0, published: 0, retryLater: 0, error: 0 } } private messageMatchesFilter( @@ -159,8 +168,13 @@ export class HandlerSpy { return spyPromise } + get counts(): SpyResultCounts { + return { ...this._counts } + } + clear() { this.messageBuffer.clear() + this._counts = { consumed: 0, published: 0, retryLater: 0, error: 0 } } getAllReceivedMessages(): SpyResultOutput[] { @@ -199,6 +213,8 @@ export class HandlerSpy { }, } as SpyResultOutput) + this._counts[processingResult.processingResult.status]++ + const cacheId = `${resolvedMessageId}-${Date.now()}-${(Math.random() + 1) .toString(36) .substring(7)}` diff --git a/packages/core/test/queues/HandlerSpy.spec.ts b/packages/core/test/queues/HandlerSpy.spec.ts index 0c5dd353..97fdce11 100644 --- a/packages/core/test/queues/HandlerSpy.spec.ts +++ b/packages/core/test/queues/HandlerSpy.spec.ts @@ -429,6 +429,139 @@ describe('HandlerSpy', () => { }) }) + describe('counts', () => { + it('Starts with all zeroes', () => { + const spy = new HandlerSpy() + + expect(spy.counts).toEqual({ + consumed: 0, + published: 0, + retryLater: 0, + error: 0, + }) + }) + + it('Increments consumed counter', () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage( + { processingResult: { status: 'consumed' }, message: TEST_MESSAGE }, + undefined, + 'test.type', + ) + + expect(spy.counts.consumed).toBe(1) + expect(spy.counts.error).toBe(0) + }) + + it('Increments error counter', () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage( + { processingResult: { status: 'error', errorReason: 'invalidMessage' }, message: null }, + 'abc', + TYPE_NOT_RESOLVED, + ) + + expect(spy.counts.error).toBe(1) + expect(spy.counts.consumed).toBe(0) + }) + + it('Increments retryLater counter', () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage( + { processingResult: { status: 'retryLater' }, message: TEST_MESSAGE }, + undefined, + 'test.type', + ) + + expect(spy.counts.retryLater).toBe(1) + }) + + it('Increments published counter', () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage( + { processingResult: { status: 'published' }, message: TEST_MESSAGE }, + undefined, + 'test.type', + ) + + expect(spy.counts.published).toBe(1) + }) + + it('Tracks multiple statuses correctly', () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage( + { processingResult: { status: 'consumed' }, message: TEST_MESSAGE }, + undefined, + 'test.type', + ) + spy.addProcessedMessage( + { processingResult: { status: 'consumed' }, message: TEST_MESSAGE_2 }, + undefined, + 'test.type', + ) + spy.addProcessedMessage( + { processingResult: { status: 'error', errorReason: 'handlerError' }, message: null }, + 'err1', + TYPE_NOT_RESOLVED, + ) + spy.addProcessedMessage( + { processingResult: { status: 'retryLater' }, message: TEST_MESSAGE }, + undefined, + 'test.type', + ) + + expect(spy.counts).toEqual({ + consumed: 2, + published: 0, + retryLater: 1, + error: 1, + }) + }) + + it('Resets counts on clear', () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage( + { processingResult: { status: 'consumed' }, message: TEST_MESSAGE }, + undefined, + 'test.type', + ) + spy.addProcessedMessage( + { processingResult: { status: 'error', errorReason: 'invalidMessage' }, message: null }, + 'abc', + TYPE_NOT_RESOLVED, + ) + + spy.clear() + + expect(spy.counts).toEqual({ + consumed: 0, + published: 0, + retryLater: 0, + error: 0, + }) + }) + + it('Returns a copy, not a reference', () => { + const spy = new HandlerSpy() + + const countsBefore = spy.counts + spy.addProcessedMessage( + { processingResult: { status: 'consumed' }, message: TEST_MESSAGE }, + undefined, + 'test.type', + ) + + expect(countsBefore.consumed).toBe(0) + expect(spy.counts.consumed).toBe(1) + }) + }) + describe('isHandlerSpy', () => { it('HandlerSpy returns true', () => { const spy = new HandlerSpy()