diff --git a/packages/metrics/README.md b/packages/metrics/README.md index d610a0fd..74d0ab1c 100644 --- a/packages/metrics/README.md +++ b/packages/metrics/README.md @@ -1,27 +1,225 @@ # Metrics -This packages contains utilities for collecting metrics in `@message-queue-toolkit` +This package contains utilities for collecting metrics in `@message-queue-toolkit`. + +## Installation + +```sh +npm install @message-queue-toolkit/metrics +``` + +## Overview + +All metrics implement the `MessageMetricsManager` interface from `@message-queue-toolkit/core`, which means they can be passed directly to any `AbstractQueueService` via the `messageMetricsManager` option. + +```ts +import { PrometheusMessageProcessingTimeMetric } from '@message-queue-toolkit/metrics' + +const metric = new PrometheusMessageProcessingTimeMetric({ + name: 'message_processing_duration_ms', + helpDescription: 'Time spent processing a message', + buckets: [10, 50, 100, 500, 1000], +}) + +// Pass to your queue service +const service = new MyQueueService({ messageMetricsManager: metric }) +``` + +--- ## Prometheus metrics -Metrics that use [Prometheus](https://prometheus.io/) toolkit and [prom-client](https://github.com/siimon/prom-client) library +All Prometheus metrics use [prom-client](https://github.com/siimon/prom-client) under the hood. + +### Base parameters + +All metrics accept `PrometheusMetricParams`: + +| Field | Type | Required | Description | +|---|---|---|---| +| `name` | `string` | yes | Prometheus metric name | +| `helpDescription` | `string` | yes | Prometheus metric description | +| `buckets` | `number[]` | histograms only | Histogram bucket boundaries | +| `messageVersion` | `string \| (metadata) => string \| undefined` | no | Static version string or function to extract version from message metadata | +| `labelNames` | `Labels[]` | when `Labels` is specified | Names of the custom labels to register. Must not overlap with `DefaultLabels` (`queue`, `messageType`, `version`, `result`) — TypeScript enforces this at compile time | + +An optional second argument accepts a custom `prom-client` instance (useful for testing or multi-registry setups). + +--- + +### Histogram metrics (time-based) + +Use `Histogram` to measure message timing. Base labels registered on every observation: + +| Label | Value | +|---|---| +| `messageType` | Message type identifier | +| `version` | Resolved message version | +| `queue` | Queue or topic name | +| `result` | Processing result status (`consumed`, `published`, `retryLater`, `error`) | + +#### Built-in implementations + +**`PrometheusMessageProcessingTimeMetric`** +Measures elapsed time from when processing started to when it ended. +``` +value = messageProcessingEndTimestamp - messageProcessingStartTimestamp +``` + +**`PrometheusMessageLifetimeMetric`** +Measures elapsed time from when the message was originally sent to when it was fully processed. Includes any time the message spent waiting in the queue. +``` +value = messageProcessingEndTimestamp - messageTimestamp +``` +Skips observation if `messageTimestamp` is not available. + +**`PrometheusMessageQueueTimeMetric`** +Measures elapsed time from when the message was originally sent to when processing started (i.e., queue wait time only). +``` +value = messageProcessingStartTimestamp - messageTimestamp +``` +Skips observation if `messageTimestamp` is not available. + +#### Custom histogram with extra labels + +Extend `PrometheusMessageTimeMetric` to add custom labels. Pass `labelNames` in the params and override `getLabelValuesForProcessedMessage`. Custom label names must not conflict with `DefaultLabels` — using a reserved name (e.g. `'result'`) will produce a TypeScript compile error: + +```ts +import { PrometheusMessageTimeMetric } from '@message-queue-toolkit/metrics' +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' +import type { LabelValues } from 'prom-client' + +class MyProcessingTimeMetric extends PrometheusMessageTimeMetric { + protected calculateObservedValue(metadata: ProcessedMessageMetadata): number | null { + return metadata.messageProcessingEndTimestamp - metadata.messageProcessingStartTimestamp + } + + protected getLabelValuesForProcessedMessage(): LabelValues<'env'> { + return { env: process.env.NODE_ENV ?? 'unknown' } + } +} + +const metric = new MyProcessingTimeMetric({ + name: 'message_processing_duration_ms', + helpDescription: 'Processing time by environment', + buckets: [10, 50, 100, 500], + labelNames: ['env'], +}) +``` + +--- + +### Counter metrics (event-based) + +Use `Counter` to count message events. Base labels registered on every increment: + +| Label | Value | +|---|---| +| `messageType` | Message type identifier | +| `version` | Resolved message version | +| `queue` | Queue or topic name | +| `result` | Processing result status (`consumed`, `published`, `retryLater`, `error`) | + +#### Built-in implementations + +**`PrometheusMessageResultCounter`** +Counts all processed messages using only the built-in base labels. No extra configuration needed. + +```ts +import { PrometheusMessageResultCounter } from '@message-queue-toolkit/metrics' + +const metric = new PrometheusMessageResultCounter({ + name: 'messages_total', + helpDescription: 'Number of messages processed', +}) +``` + +**`PrometheusMessageErrorCounter`** +Counts only messages that result in an error. Adds an `errorReason` label. Skips all non-error messages. + +```ts +import { PrometheusMessageErrorCounter } from '@message-queue-toolkit/metrics' + +const metric = new PrometheusMessageErrorCounter({ + name: 'message_errors_total', + helpDescription: 'Number of messages that failed processing', + labelNames: ['errorReason'], +}) +``` + +#### Custom counter with extra labels + +Extend `PrometheusMessageCounter` and implement `calculateCount`. Override `getLabelValuesForProcessedMessage` when adding custom labels. Same as histograms, custom label names must not conflict with `DefaultLabels`: + +```ts +import { PrometheusMessageCounter } from '@message-queue-toolkit/metrics' +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' +import type { LabelValues } from 'prom-client' + +class MyRegionCounter extends PrometheusMessageCounter { + protected calculateCount(): number | null { + return 1 + } + + protected override getLabelValuesForProcessedMessage( + metadata: ProcessedMessageMetadata, + ): LabelValues<'region'> { + return { region: metadata.message.region } + } +} + +const metric = new MyRegionCounter({ + name: 'messages_by_region_total', + helpDescription: 'Number of messages processed, by region', + labelNames: ['region'], +}) +``` + +When no custom labels are needed, omit `labelNames` and skip overriding `getLabelValuesForProcessedMessage`: + +```ts +class MyConsumedCounter extends PrometheusMessageCounter { + protected calculateCount(metadata: ProcessedMessageMetadata): number | null { + return metadata.processingResult.status === 'consumed' ? 1 : null + } +} + +const metric = new MyConsumedCounter({ + name: 'messages_consumed_total', + helpDescription: 'Number of successfully consumed messages', +}) +``` -### MessageProcessingPrometheusMetric -Abstract class implementing `MessageMetricsManager` interface, that can be injected into `AbstractQueueService` from `@message-queue-toolkit/core`. +--- -It uses [Histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) metric to collect message processing times with labels: -- `messageType` - message type -- `version` - message version -- `queue` - name of the queue or topic -- `result` - processing result +### Using multiple metrics together -See [MessageProcessingPrometheusMetric.ts](lib/prometheus/MessageProcessingPrometheusMetric.ts) for available parameters. +`MessageMultiMetricManager` aggregates multiple `MessageMetricsManager` instances and fans out each `registerProcessedMessage` call to all of them. -There are following non-abstract implementations available: -- `MessageProcessingTimeMetric` - registers elapsed time from start to the end of message processing -- `MessageLifetimeMetric` - registers elapsed time from the point where message was initially sent, to the point when it was processed. -Note: if message is waiting in the queue due to high load or barrier, the waiting time is included in the measurement +```ts +import { + MessageMultiMetricManager, + PrometheusMessageProcessingTimeMetric, + PrometheusMessageResultCounter, + PrometheusMessageErrorCounter, +} from '@message-queue-toolkit/metrics' -### MessageProcessingMultiMetrics -Implementation of `MessageMetricsManager` that allows to use multiple `MessageProcessingPrometheusMetric` instances. +const metricsManager = new MessageMultiMetricManager([ + new PrometheusMessageProcessingTimeMetric({ + name: 'message_processing_duration_ms', + helpDescription: 'Message processing time', + buckets: [10, 50, 100, 500, 1000], + }), + new PrometheusMessageResultCounter({ + name: 'messages_total', + helpDescription: 'Messages processed', + }), + new PrometheusMessageErrorCounter({ + name: 'message_errors_total', + helpDescription: 'Messages that failed processing', + labelNames: ['errorReason'], + }), +]) +const service = new MyQueueService({ messageMetricsManager: metricsManager }) +``` diff --git a/packages/metrics/lib/index.ts b/packages/metrics/lib/index.ts index 797fc0b2..578f98b4 100644 --- a/packages/metrics/lib/index.ts +++ b/packages/metrics/lib/index.ts @@ -1,5 +1,7 @@ export * from './MessageMultiMetricManager.ts' +export * from './prometheus/metrics/message-error/PrometheusMessageCounter.ts' export * from './prometheus/metrics/message-error/PrometheusMessageErrorCounter.ts' +export * from './prometheus/metrics/message-error/PrometheusMessageResultCounter.ts' export * from './prometheus/metrics/message-time/PrometheusMessageLifetimeMetric.ts' export * from './prometheus/metrics/message-time/PrometheusMessageProcessingTimeMetric.ts' export * from './prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.ts' diff --git a/packages/metrics/lib/prometheus/PrometheusMessageMetric.ts b/packages/metrics/lib/prometheus/PrometheusMessageMetric.ts index 1e9e8d7a..191cf42d 100644 --- a/packages/metrics/lib/prometheus/PrometheusMessageMetric.ts +++ b/packages/metrics/lib/prometheus/PrometheusMessageMetric.ts @@ -9,8 +9,11 @@ import type { MessageVersionGeneratingFunction, PrometheusMetricParams } from '. export abstract class PrometheusMessageMetric< MessagePayload extends object, MetricType extends Metric, - MetricParams extends - PrometheusMetricParams = PrometheusMetricParams, + Labels extends string = never, + MetricParams extends PrometheusMetricParams = PrometheusMetricParams< + MessagePayload, + Labels + >, > implements MessageMetricsManager { /** Fallbacks to null if metrics are disabled on app level */ @@ -18,7 +21,7 @@ export abstract class PrometheusMessageMetric< protected readonly messageVersionGeneratingFunction: MessageVersionGeneratingFunction - private readonly metricParams: MetricParams + protected readonly metricParams: MetricParams /** * @param metricParams - metrics parameters (see PrometheusMetricParams) diff --git a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageCounter.spec.ts b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageCounter.spec.ts new file mode 100644 index 00000000..47fe260c --- /dev/null +++ b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageCounter.spec.ts @@ -0,0 +1,145 @@ +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' +import type { Counter, LabelValues } from 'prom-client' +import * as promClient from 'prom-client' +import { describe, expect, it, vi } from 'vitest' +import { PrometheusMessageCounter } from './PrometheusMessageCounter.ts' + +type TestMessage = { + id: string + messageType: 'test' +} + +// Concrete implementation with no custom labels +class TestCounter extends PrometheusMessageCounter { + protected calculateCount(metadata: ProcessedMessageMetadata): number | null { + return metadata.processingResult.status === 'consumed' ? 1 : null + } +} + +// Concrete implementation with custom labels +class TestCounterWithLabels extends PrometheusMessageCounter { + protected override getLabelValuesForProcessedMessage( + _metadata: ProcessedMessageMetadata, + ): LabelValues<'test'> { + return { test: 'test' } + } + + protected calculateCount(): number | null { + return 1 + } +} + +const mockCounterCalls = () => { + const counterCalls: { labels: Record; value?: number }[] = [] + vi.spyOn(promClient.register, 'getSingleMetric').mockReturnValue({ + inc(labels: Record, value?: number) { + counterCalls.push({ labels, value }) + }, + } as Counter) + return counterCalls +} + +describe('PrometheusMessageCounter', () => { + describe('labelNames', () => { + it('allows omitting labelNames when no custom Labels type is defined', () => { + // No TypeScript error and no runtime error when labelNames is omitted + expect( + () => new TestCounter({ name: 'test_counter', helpDescription: 'test' }, promClient), + ).not.toThrow() + }) + + it('registers only base labels when no custom labels are defined', () => { + // Given + const counterCalls = mockCounterCalls() + const metric = new TestCounter({ name: 'test_counter', helpDescription: 'test' }, promClient) + const message: TestMessage = { id: '1', messageType: 'test' } + + // When + metric.registerProcessedMessage({ + messageId: message.id, + messageType: message.messageType, + processingResult: { status: 'consumed' }, + message, + queueName: 'test-queue', + messageTimestamp: Date.now(), + messageProcessingStartTimestamp: Date.now(), + messageProcessingEndTimestamp: Date.now(), + }) + + // Then + expect(counterCalls).toMatchInlineSnapshot(` + [ + { + "labels": { + "messageType": "test", + "queue": "test-queue", + "result": "consumed", + "version": undefined, + }, + "value": 1, + }, + ] + `) + }) + }) + + it('skips increment when calculateCount returns null', () => { + // Given + const counterCalls = mockCounterCalls() + const metric = new TestCounter({ name: 'test_counter', helpDescription: 'test' }, promClient) + const message: TestMessage = { id: '1', messageType: 'test' } + + // When + metric.registerProcessedMessage({ + messageId: message.id, + messageType: message.messageType, + processingResult: { status: 'error', errorReason: 'invalidMessage' }, + message, + queueName: 'test-queue', + messageTimestamp: Date.now(), + messageProcessingStartTimestamp: Date.now(), + messageProcessingEndTimestamp: Date.now(), + }) + + // Then + expect(counterCalls).toHaveLength(0) + }) + + it('registers custom labels when Labels type is defined', () => { + // Given + const counterCalls = mockCounterCalls() + const metric = new TestCounterWithLabels( + { name: 'test_counter_labels', helpDescription: 'test', labelNames: ['test'] }, + promClient, + ) + const message: TestMessage = { id: '1', messageType: 'test' } + + // When + metric.registerProcessedMessage({ + messageId: message.id, + messageType: message.messageType, + processingResult: { status: 'consumed' }, + message, + queueName: 'test-queue', + messageTimestamp: Date.now(), + messageProcessingStartTimestamp: Date.now(), + messageProcessingEndTimestamp: Date.now(), + }) + + // Then + expect(counterCalls).toMatchInlineSnapshot(` + [ + { + "labels": { + "messageType": "test", + "queue": "test-queue", + "result": "consumed", + "test": "test", + "version": undefined, + }, + "value": 1, + }, + ] + `) + }) +}) diff --git a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageCounter.ts b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageCounter.ts new file mode 100644 index 00000000..7c9fa5fd --- /dev/null +++ b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageCounter.ts @@ -0,0 +1,53 @@ +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' +import type promClient from 'prom-client' +import type { Counter, LabelValues } from 'prom-client' +import { PrometheusMessageMetric } from '../../PrometheusMessageMetric.ts' +import type { DefaultLabels, PrometheusMetricParams } from '../../types.ts' + +export abstract class PrometheusMessageCounter< + MessagePayload extends object, + Labels extends string = never, +> extends PrometheusMessageMetric, Labels> { + protected createMetric( + client: typeof promClient, + metricParams: PrometheusMetricParams, + ): Counter { + return new client.Counter({ + name: metricParams.name, + help: metricParams.helpDescription, + labelNames: [ + 'queue', + 'messageType', + 'version', + 'result', + ...(this.metricParams.labelNames ?? []), + ], + }) + } + + registerProcessedMessage(metadata: ProcessedMessageMetadata): void { + const count = this.calculateCount(metadata) + if (count === null) return + + this.metric.inc( + { + queue: metadata.queueName, + messageType: metadata.messageType, + result: metadata.processingResult.status, + version: this.messageVersionGeneratingFunction(metadata), + ...this.getLabelValuesForProcessedMessage(metadata), + } as LabelValues, + count, + ) + } + + protected getLabelValuesForProcessedMessage( + _metadata: ProcessedMessageMetadata, + ): LabelValues { + return {} as LabelValues + } + + protected abstract calculateCount( + metadata: ProcessedMessageMetadata, + ): number | null +} diff --git a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.spec.ts b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.spec.ts index efe1ab8f..52ed7db3 100644 --- a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.spec.ts +++ b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.spec.ts @@ -21,6 +21,7 @@ describe('PrometheusMessageErrorCounter', () => { { name: 'test_metric', helpDescription: 'test description', + labelNames: ['errorReason'], messageVersion: (metadata: ProcessedMessageMetadata) => { registeredMessages.push(metadata) // Mocking it to check if value is registered properly return undefined @@ -84,6 +85,7 @@ describe('PrometheusMessageErrorCounter', () => { { name: 'Test metric', helpDescription: 'test description', + labelNames: ['errorReason'], }, promClient, ) @@ -118,6 +120,7 @@ describe('PrometheusMessageErrorCounter', () => { { name: 'Test metric', helpDescription: 'test description', + labelNames: ['errorReason'], }, promClient, ) @@ -148,6 +151,7 @@ describe('PrometheusMessageErrorCounter', () => { "errorReason": "handlerError", "messageType": "test", "queue": "test-queue", + "result": "error", "version": undefined, }, "value": 1, @@ -164,6 +168,7 @@ describe('PrometheusMessageErrorCounter', () => { { name: 'Test metric', helpDescription: 'test description', + labelNames: ['errorReason'], messageVersion: (metadata: ProcessedMessageMetadata) => metadata.message?.metadata?.schemaVersion, }, @@ -210,6 +215,7 @@ describe('PrometheusMessageErrorCounter', () => { "errorReason": "retryLaterExceeded", "messageType": "test", "queue": "test-queue", + "result": "error", "version": undefined, }, "value": 1, @@ -219,6 +225,7 @@ describe('PrometheusMessageErrorCounter', () => { "errorReason": "retryLaterExceeded", "messageType": "test", "queue": "test-queue", + "result": "error", "version": "1.0.0", }, "value": 1, diff --git a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.ts b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.ts index 32bc8c46..4da29992 100644 --- a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.ts +++ b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageErrorCounter.ts @@ -1,37 +1,24 @@ import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' -import type promClient from 'prom-client' -import type { Counter } from 'prom-client' -import { PrometheusMessageMetric } from '../../PrometheusMessageMetric.ts' -import type { PrometheusMetricParams } from '../../types.ts' +import type { LabelValues } from 'prom-client' +import { PrometheusMessageCounter } from './PrometheusMessageCounter.ts' export class PrometheusMessageErrorCounter< MessagePayload extends object, -> extends PrometheusMessageMetric< - MessagePayload, - Counter<'queue' | 'messageType' | 'version' | 'errorReason'> -> { - protected createMetric( - client: typeof promClient, - metricParams: PrometheusMetricParams, - ): Counter { - return new client.Counter({ - name: metricParams.name, - help: metricParams.helpDescription, - labelNames: ['queue', 'messageType', 'version', 'errorReason'], - }) +> extends PrometheusMessageCounter { + protected override getLabelValuesForProcessedMessage( + metadata: ProcessedMessageMetadata, + ): LabelValues<'errorReason'> { + return { + errorReason: + metadata.processingResult.status === 'error' + ? metadata.processingResult.errorReason + : undefined, + } } - registerProcessedMessage(metadata: ProcessedMessageMetadata): void { - if (metadata.processingResult.status !== 'error') return + protected calculateCount(metadata: ProcessedMessageMetadata): number | null { + if (metadata.processingResult.status !== 'error') return null - this.metric.inc( - { - queue: metadata.queueName, - messageType: metadata.messageType, - errorReason: metadata.processingResult.errorReason, - version: this.messageVersionGeneratingFunction(metadata), - }, - 1, - ) + return 1 } } diff --git a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageResultCounter.spec.ts b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageResultCounter.spec.ts new file mode 100644 index 00000000..7951a34a --- /dev/null +++ b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageResultCounter.spec.ts @@ -0,0 +1,144 @@ +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' +import type { Counter } from 'prom-client' +import * as promClient from 'prom-client' +import { describe, expect, it, vi } from 'vitest' +import { PrometheusMessageResultCounter } from './PrometheusMessageResultCounter.ts' + +type TestMessage = { + id: string + messageType: 'test' + metadata?: { + schemaVersion: string + } +} + +const mockCounterCalls = () => { + const counterCalls: { labels: Record; value?: number }[] = [] + vi.spyOn(promClient.register, 'getSingleMetric').mockReturnValue({ + inc(labels: Record, value?: number) { + counterCalls.push({ labels, value }) + }, + } as Counter) + return counterCalls +} + +const buildMetadata = ( + message: TestMessage, + overrides?: Partial>, +): ProcessedMessageMetadata => ({ + messageId: message.id, + messageType: message.messageType, + processingResult: { status: 'consumed' }, + message, + queueName: 'test-queue', + messageTimestamp: Date.now(), + messageProcessingStartTimestamp: Date.now(), + messageProcessingEndTimestamp: Date.now(), + ...overrides, +}) + +describe('PrometheusMessageResultCounter', () => { + it.each([ + { status: 'consumed' }, + { status: 'published' }, + { status: 'retryLater' }, + { status: 'error', errorReason: 'handlerError' }, + ] as const)('registers resultStatus label for %o', (processingResult) => { + // Given + const counterCalls = mockCounterCalls() + const metric = new PrometheusMessageResultCounter( + { name: 'test_metric', helpDescription: 'test' }, + promClient, + ) + + // When + metric.registerProcessedMessage( + buildMetadata({ id: '1', messageType: 'test' }, { processingResult }), + ) + + // Then + expect(counterCalls).toHaveLength(1) + expect(counterCalls[0]?.labels).toMatchObject({ result: processingResult.status }) + expect(counterCalls[0]?.value).toBe(1) + }) + + it('registers base labels alongside resultStatus', () => { + // Given + const counterCalls = mockCounterCalls() + const metric = new PrometheusMessageResultCounter( + { name: 'test_metric', helpDescription: 'test' }, + promClient, + ) + + // When + metric.registerProcessedMessage( + buildMetadata( + { id: '1', messageType: 'test' }, + { queueName: 'my-queue', processingResult: { status: 'consumed' } }, + ), + ) + + // Then + expect(counterCalls).toMatchInlineSnapshot(` + [ + { + "labels": { + "messageType": "test", + "queue": "my-queue", + "result": "consumed", + "version": undefined, + }, + "value": 1, + }, + ] + `) + }) + + it('resolves version from message metadata', () => { + // Given + const counterCalls = mockCounterCalls() + const metric = new PrometheusMessageResultCounter( + { + name: 'test_metric', + helpDescription: 'test', + messageVersion: (metadata) => metadata.message?.metadata?.schemaVersion, + }, + promClient, + ) + const messages: TestMessage[] = [ + { id: '1', messageType: 'test' }, + { id: '2', messageType: 'test', metadata: { schemaVersion: '2.0.0' } }, + ] + + // When + for (const message of messages) { + metric.registerProcessedMessage( + buildMetadata(message, { processingResult: { status: 'consumed' } }), + ) + } + + // Then + expect(counterCalls).toMatchInlineSnapshot(` + [ + { + "labels": { + "messageType": "test", + "queue": "test-queue", + "result": "consumed", + "version": undefined, + }, + "value": 1, + }, + { + "labels": { + "messageType": "test", + "queue": "test-queue", + "result": "consumed", + "version": "2.0.0", + }, + "value": 1, + }, + ] + `) + }) +}) diff --git a/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageResultCounter.ts b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageResultCounter.ts new file mode 100644 index 00000000..5f4eb3c3 --- /dev/null +++ b/packages/metrics/lib/prometheus/metrics/message-error/PrometheusMessageResultCounter.ts @@ -0,0 +1,9 @@ +import { PrometheusMessageCounter } from './PrometheusMessageCounter.ts' + +export class PrometheusMessageResultCounter< + MessagePayload extends object, +> extends PrometheusMessageCounter { + protected calculateCount(): number | null { + return 1 + } +} diff --git a/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageTimeMetric.spec.ts b/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageTimeMetric.spec.ts new file mode 100644 index 00000000..781783dc --- /dev/null +++ b/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageTimeMetric.spec.ts @@ -0,0 +1,127 @@ +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' +import type { Histogram, LabelValues } from 'prom-client' +import * as promClient from 'prom-client' +import { describe, expect, it, vi } from 'vitest' +import { PrometheusMessageTimeMetric } from './PrometheusMessageTimeMetric.ts' + +type TestMessage = { + id: string + messageType: 'test' +} + +// Concrete implementation with no custom labels +class TestTimeMetric extends PrometheusMessageTimeMetric { + protected calculateObservedValue(metadata: ProcessedMessageMetadata): number | null { + return metadata.messageProcessingEndTimestamp - metadata.messageProcessingStartTimestamp + } +} + +// Concrete implementation with custom labels +class TestTimeMetricWithLabels extends PrometheusMessageTimeMetric { + protected calculateObservedValue(metadata: ProcessedMessageMetadata): number | null { + return metadata.messageProcessingEndTimestamp - metadata.messageProcessingStartTimestamp + } + + protected override getLabelValuesForProcessedMessage(): LabelValues<'env'> { + return { env: 'production' } + } +} + +const mockObservedValues = () => { + const observedValues: { labels: Record; value: number }[] = [] + vi.spyOn(promClient.register, 'getSingleMetric').mockReturnValue({ + observe(labels: Record, value: number) { + observedValues.push({ labels, value }) + }, + } as Histogram) + return observedValues +} + +const buildMetadata = ( + message: TestMessage, + overrides?: Partial>, +): ProcessedMessageMetadata => ({ + messageId: message.id, + messageType: message.messageType, + processingResult: { status: 'consumed' }, + message, + queueName: 'test-queue', + messageTimestamp: Date.now(), + messageProcessingStartTimestamp: 1000, + messageProcessingEndTimestamp: 1100, + ...overrides, +}) + +describe('PrometheusMessageTimeMetric', () => { + describe('labelNames', () => { + it('allows omitting labelNames when no custom Labels type is defined', () => { + expect( + () => + new TestTimeMetric( + { name: 'test_histogram', helpDescription: 'test', buckets: [10, 50, 100] }, + promClient, + ), + ).not.toThrow() + }) + + it('registers only base labels when no custom labels are defined', () => { + // Given + const observedValues = mockObservedValues() + const metric = new TestTimeMetric( + { name: 'test_histogram', helpDescription: 'test', buckets: [10, 50, 100] }, + promClient, + ) + + // When + metric.registerProcessedMessage(buildMetadata({ id: '1', messageType: 'test' })) + + // Then + expect(observedValues).toMatchInlineSnapshot(` + [ + { + "labels": { + "messageType": "test", + "queue": "test-queue", + "result": "consumed", + "version": undefined, + }, + "value": 100, + }, + ] + `) + }) + }) + + it('registers custom labels alongside base labels', () => { + // Given + const observedValues = mockObservedValues() + const metric = new TestTimeMetricWithLabels( + { + name: 'test_histogram_labels', + helpDescription: 'test', + buckets: [10, 50, 100], + labelNames: ['env'], + }, + promClient, + ) + + // When + metric.registerProcessedMessage(buildMetadata({ id: '1', messageType: 'test' })) + + // Then + expect(observedValues).toMatchInlineSnapshot(` + [ + { + "labels": { + "env": "production", + "messageType": "test", + "queue": "test-queue", + "result": "consumed", + "version": undefined, + }, + "value": 100, + }, + ] + `) + }) +}) diff --git a/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageTimeMetric.ts b/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageTimeMetric.ts index 0dc29be9..ce01681e 100644 --- a/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageTimeMetric.ts +++ b/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageTimeMetric.ts @@ -1,28 +1,41 @@ -import type { MakeRequired } from '@lokalise/universal-ts-utils/node' import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' import type promClient from 'prom-client' -import type { Histogram } from 'prom-client' +import type { Histogram, LabelValues } from 'prom-client' import { PrometheusMessageMetric } from '../../PrometheusMessageMetric.ts' -import type { PrometheusMetricParams } from '../../types.ts' +import type { DefaultLabels, PrometheusMetricParams } from '../../types.ts' + +export type PrometheusMetricTimeParams< + MessagePayload extends object, + Labels extends string = never, +> = PrometheusMetricParams & { buckets: number[] } export abstract class PrometheusMessageTimeMetric< MessagePayload extends object, + Labels extends string = never, > extends PrometheusMessageMetric< MessagePayload, - Histogram<'messageType' | 'version' | 'queue' | 'result'>, - MakeRequired, 'buckets'> + Histogram, + Labels, + PrometheusMetricTimeParams > { protected createMetric( client: typeof promClient, - metricParams: MakeRequired, 'buckets'>, - ): Histogram<'messageType' | 'version' | 'queue' | 'result'> { + metricParams: PrometheusMetricTimeParams, + ): Histogram { return new client.Histogram({ name: metricParams.name, help: metricParams.helpDescription, buckets: metricParams.buckets, - labelNames: ['messageType', 'version', 'queue', 'result'], + labelNames: [ + 'messageType', + 'version', + 'queue', + 'result', + ...(this.metricParams.labelNames ?? []), + ], }) } + registerProcessedMessage(metadata: ProcessedMessageMetadata): void { const observedValue: number | null = this.calculateObservedValue(metadata) @@ -35,11 +48,18 @@ export abstract class PrometheusMessageTimeMetric< version: this.messageVersionGeneratingFunction(metadata), queue: metadata.queueName, result: metadata.processingResult.status, - }, + ...this.getLabelValuesForProcessedMessage(metadata), + } as LabelValues, observedValue, ) } + protected getLabelValuesForProcessedMessage( + _metadata: ProcessedMessageMetadata, + ): LabelValues { + return {} as LabelValues + } + protected abstract calculateObservedValue( metadata: ProcessedMessageMetadata, ): number | null diff --git a/packages/metrics/lib/prometheus/types.ts b/packages/metrics/lib/prometheus/types.ts index 3d21b43c..24180488 100644 --- a/packages/metrics/lib/prometheus/types.ts +++ b/packages/metrics/lib/prometheus/types.ts @@ -3,7 +3,7 @@ import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' /** * Parameters used for registering message processing metrics in Prometheus */ -export type PrometheusMetricParams = { +export type PrometheusMetricParams = { /** * Prometheus metric name */ @@ -23,8 +23,16 @@ export type PrometheusMetricParams = { * Message version used as a label - can be static string or method resolving version based on payload */ messageVersion?: string | MessageVersionGeneratingFunction -} +} & LabelNames export type MessageVersionGeneratingFunction = ( messageMetadata: ProcessedMessageMetadata, ) => string | undefined + +export type DefaultLabels = 'queue' | 'messageType' | 'version' | 'result' + +type LabelNames = [Labels] extends [never] + ? { labelNames?: never[] } + : [Extract] extends [never] + ? { labelNames: Labels[] } + : never