diff --git a/.changeset/adr-0030-p1-reliable-delivery.md b/.changeset/adr-0030-p1-reliable-delivery.md new file mode 100644 index 000000000..e7de8edb0 --- /dev/null +++ b/.changeset/adr-0030-p1-reliable-delivery.md @@ -0,0 +1,36 @@ +--- +"@objectstack/service-messaging": minor +--- + +ADR-0030 P1 — reliable delivery + RecipientResolver. + +**RecipientResolver** — the single home for audience → user-id expansion, wired +into `MessagingService.emit()`. Queries the same identity/membership model +`plugin-sharing` uses (directly via the data engine, no backward plugin +dependency): +- `role:` → `sys_member` rows (tenant-scoped) +- `team:` → `sys_team_member` rows +- `owner_of::` / `{ ownerOf }` → the record's owner/assignee field +- `` → `sys_user` (verbatim fallback on miss); `user:` / bare id → id + +Best-effort: a failed directory lookup yields 0 recipients for that spec rather +than throwing. The inbox channel's email→id fallback moved here — the channel +now keys rows by the already-resolved recipient. + +**Reliable delivery outbox + dispatcher** (mirrors `plugin-webhooks`): +- New `sys_notification_delivery` outbox object (L4) — one row per + `(event × recipient × channel)`; `pending|in_flight|success|failed|dead|suppressed` + state machine; unique `(notification_id, recipient_id, channel)` enqueue dedup. +- `INotificationOutbox` with `SqlNotificationOutbox` + `MemoryNotificationOutbox` + backends; atomic claim (`pending → in_flight`) + stale-in_flight reaping. +- `NotificationDispatcher` — interval loop over partitions, each guarded by a + per-partition cluster lock (single-node always-grant fallback when no cluster + service); sends via the channel and acks with exponential backoff + jitter; + dead-letters once the retry budget is exhausted. +- `emit()` enqueues `pending` deliveries when an outbox is attached; otherwise it + fans out inline (the P0 behavior). `MessagingServicePlugin` wires the outbox + + dispatcher at `kernel:ready` and registers the new object. + +A failed channel send now retries and is observable on the delivery row; +duplicate enqueue is idempotent. Backoff/classification and clocks are injectable +for deterministic tests. diff --git a/packages/services/service-messaging/src/backoff.ts b/packages/services/service-messaging/src/backoff.ts new file mode 100644 index 000000000..b33bbd0ac --- /dev/null +++ b/packages/services/service-messaging/src/backoff.ts @@ -0,0 +1,67 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { AckResult } from './outbox.js'; +import type { SendResult, ErrorClass } from './channel.js'; + +/** + * Stable, framework-free partition hash (32-bit FNV-1a). Both the dispatcher + * and the outbox `claim()` filter on it, so it must be a single shared helper. + * Same implementation as `plugin-webhooks`. + */ +export function hashPartition(key: string, count: number): number { + if (count <= 0) throw new Error('partition count must be > 0'); + let h = 0x811c9dc5; + for (let i = 0; i < key.length; i++) { + h ^= key.charCodeAt(i); + h = Math.imul(h, 0x01000193); + } + return Math.abs(h | 0) % count; +} + +/** + * Exponential retry schedule with jitter. Returns the delay (ms) before the + * next attempt given how many attempts have already happened, or `null` once + * the budget is exhausted (→ dead). + * + * attempt 1 fails → ~1s · 2 → ~10s · 3 → ~1m · 4 → ~10m · 5 → ~1h · 6+ → dead + */ +export function nextRetryDelayMs(attemptsSoFar: number, rng: () => number = Math.random): number | null { + const SCHEDULE = [1_000, 10_000, 60_000, 600_000, 3_600_000]; + if (attemptsSoFar < 1 || attemptsSoFar > SCHEDULE.length) return null; + const base = SCHEDULE[attemptsSoFar - 1]; + const jitter = 0.8 + rng() * 0.4; // ∈ [0.8, 1.2) + return Math.floor(base * jitter); +} + +/** + * Turn a channel `send()` outcome into an {@link AckResult}, applying the retry + * schedule on retriable failures. + * + * - `ok` → success. + * - `errorClass` of `permanent` → dead immediately (no point retrying). + * - `errorClass` of `invalid_recipient` → suppressed (not our transport's fault). + * - otherwise (retryable / unknown) → schedule a retry, or dead once the budget + * is exhausted. + */ +export function classifyDeliveryAttempt( + result: SendResult, + errorClass: ErrorClass | undefined, + attemptsSoFar: number, + now: number = Date.now(), + rng?: () => number, +): AckResult { + if (result.ok) return { success: true }; + + if (errorClass === 'invalid_recipient') { + return { success: false, error: result.error, suppressed: true }; + } + if (errorClass === 'permanent') { + return { success: false, error: result.error, dead: true }; + } + + const delay = nextRetryDelayMs(attemptsSoFar + 1, rng); + if (delay === null) { + return { success: false, error: result.error, dead: true }; + } + return { success: false, error: result.error, nextAttemptAt: now + delay }; +} diff --git a/packages/services/service-messaging/src/dispatcher.test.ts b/packages/services/service-messaging/src/dispatcher.test.ts new file mode 100644 index 000000000..15f1df318 --- /dev/null +++ b/packages/services/service-messaging/src/dispatcher.test.ts @@ -0,0 +1,157 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { MemoryNotificationOutbox } from './memory-outbox.js'; +import { NotificationDispatcher } from './dispatcher.js'; +import { classifyDeliveryAttempt, nextRetryDelayMs } from './backoff.js'; +import type { MessagingChannel, SendResult } from './channel.js'; + +function silentCtx() { + return { logger: { info: () => {}, warn: () => {}, error: () => {} } }; +} + +/** A channel whose send() outcome is scripted per call. */ +function scriptedChannel(id: string, results: SendResult[]): { channel: MessagingChannel; calls: number } { + const state = { calls: 0 }; + const channel: MessagingChannel = { + id, + async send() { + const r = results[Math.min(state.calls, results.length - 1)]; + state.calls += 1; + return r; + }, + classifyError: () => 'retryable', + }; + return { channel, get calls() { return state.calls; } } as any; +} + +function dispatcher( + outbox: MemoryNotificationOutbox, + channels: MessagingChannel[], + rng = () => 0.5, + now?: () => number, +) { + const registry = { + getChannel: (cid: string) => channels.find((c) => c.id === cid), + }; + return new NotificationDispatcher({ + nodeId: 'node-test', + outbox, + channels: registry, + channelContext: silentCtx(), + rng, + now, + intervalMs: 10_000, // we drive ticks manually + }); +} + +describe('nextRetryDelayMs', () => { + it('follows the schedule and dead-letters after the budget', () => { + expect(nextRetryDelayMs(1, () => 0)).toBe(800); // 1000 * 0.8 + expect(nextRetryDelayMs(5, () => 0)).toBe(2_880_000); // 3_600_000 * 0.8 + expect(nextRetryDelayMs(6)).toBeNull(); // exhausted + expect(nextRetryDelayMs(0)).toBeNull(); + }); +}); + +describe('classifyDeliveryAttempt', () => { + it('success short-circuits', () => { + expect(classifyDeliveryAttempt({ ok: true }, undefined, 0)).toEqual({ success: true }); + }); + it('permanent → dead, invalid_recipient → suppressed', () => { + expect(classifyDeliveryAttempt({ ok: false, error: 'x' }, 'permanent', 0)).toMatchObject({ dead: true }); + expect(classifyDeliveryAttempt({ ok: false, error: 'x' }, 'invalid_recipient', 0)).toMatchObject({ suppressed: true }); + }); + it('retryable schedules nextAttemptAt until the budget is exhausted', () => { + const r = classifyDeliveryAttempt({ ok: false, error: 'x' }, 'retryable', 0, 1000, () => 0); + expect(r).toMatchObject({ success: false, nextAttemptAt: 1800 }); + const dead = classifyDeliveryAttempt({ ok: false, error: 'x' }, 'retryable', 5, 1000, () => 0); + expect(dead).toMatchObject({ dead: true }); + }); +}); + +describe('NotificationDispatcher', () => { + it('delivers a pending row through its channel and marks it success', async () => { + const outbox = new MemoryNotificationOutbox(1); + const { channel } = scriptedChannel('inbox', [{ ok: true, externalId: 'inbox_1' }]); + const seen: any[] = []; + const sendCh: MessagingChannel = { + id: 'inbox', + async send(_ctx, d) { seen.push(d); return { ok: true }; }, + }; + await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: { title: 'Hi', body: 'there' } }); + + const d = dispatcher(outbox, [sendCh]); + await d.tick(); + + expect(seen).toHaveLength(1); + expect(seen[0].recipient).toBe('u1'); + expect(seen[0].notification.title).toBe('Hi'); + const rows = await outbox.list(); + expect(rows[0].status).toBe('success'); + expect(rows[0].attempts).toBe(1); + void channel; + }); + + it('retries a failed send (status back to pending with a future nextAttemptAt)', async () => { + const outbox = new MemoryNotificationOutbox(1); + const failing: MessagingChannel = { + id: 'inbox', + async send() { return { ok: false, error: 'db down' }; }, + classifyError: () => 'retryable', + }; + await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: { title: 'Hi' } }); + + const d = dispatcher(outbox, [failing]); + await d.tick(); + + const [row] = await outbox.list(); + expect(row.status).toBe('pending'); + expect(row.attempts).toBe(1); + expect(row.nextAttemptAt).toBeGreaterThan(Date.now()); + expect(row.error).toBe('db down'); + }); + + it('dead-letters a row whose channel is not registered', async () => { + const outbox = new MemoryNotificationOutbox(1); + await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'sms', payload: { title: 'Hi' } }); + const d = dispatcher(outbox, []); // no channels + await d.tick(); + const [row] = await outbox.list(); + expect(row.status).toBe('dead'); + expect(row.error).toContain("channel 'sms' not registered"); + }); + + it('eventually dead-letters after the retry budget is exhausted', async () => { + // Shared injectable clock: advance past the largest backoff each round so + // the row is ready, deterministically, without real timers. + let t = 1_000; + const clock = () => t; + const outbox = new MemoryNotificationOutbox(1, clock); + const failing: MessagingChannel = { + id: 'inbox', + async send() { return { ok: false, error: 'always fails' }; }, + classifyError: () => 'retryable', + }; + const id = await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: { title: 'Hi' } }); + const d = dispatcher(outbox, [failing], () => 0.5, clock); + + // 6 attempts: 5 scheduled retries then dead. + for (let i = 0; i < 6; i++) { + t += 5_000_000; // > max backoff (3.6M × 1.2) + await d.tick(); + } + const [row] = await outbox.list(); + expect(row.status).toBe('dead'); + expect(row.attempts).toBe(6); + expect(row.id).toBe(id); + }); + + it('dedups enqueue on (notification, recipient, channel)', async () => { + const outbox = new MemoryNotificationOutbox(1); + const a = await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: {} }); + const b = await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: {} }); + expect(a).toBe(b); + expect(await outbox.list()).toHaveLength(1); + }); +}); diff --git a/packages/services/service-messaging/src/dispatcher.ts b/packages/services/service-messaging/src/dispatcher.ts new file mode 100644 index 000000000..d4e557077 --- /dev/null +++ b/packages/services/service-messaging/src/dispatcher.ts @@ -0,0 +1,231 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { MessagingChannel, MessagingChannelContext, Notification, SendResult } from './channel.js'; +import type { INotificationOutbox, NotificationDeliveryRecord } from './outbox.js'; +import { classifyDeliveryAttempt } from './backoff.js'; + +/** Minimal channel-registry surface the dispatcher needs (MessagingService satisfies it). */ +export interface ChannelRegistry { + getChannel(id: string): MessagingChannel | undefined; +} + +/** A held lock; `release()` frees it, `isHeld()`/`renew()` mirror the cluster API. */ +export interface DispatchLockHandle { + release(): Promise | void; + isHeld?(): boolean; + renew?(ttlMs: number): Promise | void; +} + +/** Just the slice of `IClusterService` the dispatcher uses. */ +export interface DispatchCluster { + lock: { + acquire(key: string, opts: { ttlMs: number; waitMs: number }): Promise; + }; +} + +/** + * Single-node fallback lock — always grants. Used when no cluster service is + * registered, so the dispatcher runs correctly (just without cross-node + * coordination). The per-partition serialization a single process needs is + * already provided by the `inflightTick` guard + the outbox's atomic claim. + */ +const SINGLE_NODE_CLUSTER: DispatchCluster = { + lock: { + async acquire() { + return { release() {}, isHeld: () => true, renew() {} }; + }, + }, +}; + +export interface NotificationDispatcherLogger { + warn: (msg: string, meta?: any) => void; + info?: (msg: string, meta?: any) => void; +} + +export interface NotificationDispatcherOptions { + nodeId: string; + outbox: INotificationOutbox; + channels: ChannelRegistry; + /** Context handed to each channel's `send()` (logger). */ + channelContext: MessagingChannelContext; + /** Cross-node coordination. Defaults to a single-node always-grant lock. */ + cluster?: DispatchCluster; + partitionCount?: number; + batchSize?: number; + intervalMs?: number; + lockTtlMs?: number; + claimTtlMs?: number; + rng?: () => number; + /** Injectable clock (ms) for deterministic tests. Defaults to Date.now. */ + now?: () => number; + logger?: NotificationDispatcherLogger; + /** Observability hook fired after every attempt. */ + onAttempt?: (delivery: NotificationDeliveryRecord, success: boolean) => void; +} + +/** + * NotificationDispatcher (ADR-0030 P1) — drains the `sys_notification_delivery` + * outbox and sends each row through its channel, retrying with backoff and + * dead-lettering once the budget is exhausted. Structurally mirrors + * `WebhookDispatcher`: an interval loop walks `partitionCount` partitions, each + * guarded by a per-partition cluster lock; within a held partition it claims a + * batch (`pending → in_flight`), sends, and acks. + * + * At-least-once: if a channel send succeeds but the ack write fails, the row + * reverts to pending after the claim TTL and is re-sent — the inbox channel's + * receipt write is idempotent-friendly, and downstream channels should be too. + */ +export class NotificationDispatcher { + private readonly opts: Required< + Omit + > & + Pick & { cluster: DispatchCluster }; + private timer: ReturnType | undefined; + private running = false; + private inflightTick: Promise | undefined; + + constructor(options: NotificationDispatcherOptions) { + const intervalMs = options.intervalMs ?? 500; + const lockTtlMs = options.lockTtlMs ?? intervalMs * 5; + this.opts = { + nodeId: options.nodeId, + outbox: options.outbox, + channels: options.channels, + channelContext: options.channelContext, + cluster: options.cluster ?? SINGLE_NODE_CLUSTER, + partitionCount: options.partitionCount ?? 8, + batchSize: options.batchSize ?? 32, + intervalMs, + lockTtlMs, + claimTtlMs: options.claimTtlMs ?? lockTtlMs * 2, + rng: options.rng, + now: options.now, + logger: options.logger, + onAttempt: options.onAttempt, + }; + } + + /** Begin the periodic loop. Idempotent. */ + start(): void { + if (this.running) return; + this.running = true; + this.scheduleTick(); + this.timer = setInterval(() => this.scheduleTick(), this.opts.intervalMs); + // Don't keep the event loop alive solely for the dispatcher. + (this.timer as { unref?: () => void })?.unref?.(); + } + + /** Stop the loop and drain the in-flight tick. */ + async stop(): Promise { + if (!this.running) return; + this.running = false; + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + if (this.inflightTick) { + try { await this.inflightTick; } catch { /* already logged */ } + } + } + + /** Run one full tick (all partitions). Exposed for deterministic tests. */ + async tick(): Promise { + await this.runTick(); + } + + private scheduleTick(): void { + if (this.inflightTick) return; + this.inflightTick = this.runTick() + .catch((err) => { + this.opts.logger?.warn?.('notification-dispatcher: tick failed', { + nodeId: this.opts.nodeId, + error: (err as Error)?.message ?? String(err), + }); + }) + .finally(() => { this.inflightTick = undefined; }); + } + + private async runTick(): Promise { + const count = this.opts.partitionCount; + const offset = stableNodeOffset(this.opts.nodeId, count); + for (let step = 0; step < count; step++) { + await this.runPartition((offset + step) % count); + } + } + + private async runPartition(index: number): Promise { + const handle = await this.opts.cluster.lock.acquire(`notify.dispatcher.partition.${index}`, { + ttlMs: this.opts.lockTtlMs, + waitMs: 0, + }); + if (!handle) return; + try { + const claimed = await this.opts.outbox.claim({ + nodeId: this.opts.nodeId, + limit: this.opts.batchSize, + partition: { index, count: this.opts.partitionCount }, + claimTtlMs: this.opts.claimTtlMs, + }); + if (claimed.length === 0) return; + await handle.renew?.(this.opts.lockTtlMs); + for (const row of claimed) { + if (handle.isHeld && !handle.isHeld()) break; + await this.processRow(row); + } + } finally { + await handle.release(); + } + } + + private async processRow(row: NotificationDeliveryRecord): Promise { + const channel = this.opts.channels.getChannel(row.channel); + if (!channel) { + // No transport for this channel → terminal, observable on the row. + await this.opts.outbox.ack(row.id, { + success: false, + error: `channel '${row.channel}' not registered`, + dead: true, + }); + this.opts.onAttempt?.(row, false); + return; + } + + const p = row.payload ?? {}; + const notification: Notification = { + notificationId: row.notificationId, + organizationId: row.organizationId, + topic: row.topic, + title: typeof p.title === 'string' ? p.title : row.topic ?? '', + body: typeof p.body === 'string' ? p.body : '', + severity: (p.severity as Notification['severity']) ?? 'info', + recipients: [row.recipientId], + channels: [row.channel], + actionUrl: typeof p.actionUrl === 'string' ? p.actionUrl : undefined, + payload: p, + }; + + let result: SendResult; + try { + result = await channel.send(this.opts.channelContext, { + notification, + channel: row.channel, + recipient: row.recipientId, + }); + } catch (err) { + result = { ok: false, error: (err as Error)?.message ?? String(err) }; + } + + const errorClass = !result.ok && channel.classifyError ? channel.classifyError(result.error) : undefined; + const now = this.opts.now?.() ?? Date.now(); + const ack = classifyDeliveryAttempt(result, errorClass, row.attempts, now, this.opts.rng); + await this.opts.outbox.ack(row.id, ack); + this.opts.onAttempt?.(row, result.ok); + } +} + +/** Spread the starting partition per node so contention rotates fairly. */ +function stableNodeOffset(nodeId: string, partitionCount: number): number { + let h = 0; + for (let i = 0; i < nodeId.length; i++) h = (h * 31 + nodeId.charCodeAt(i)) | 0; + return Math.abs(h) % partitionCount; +} diff --git a/packages/services/service-messaging/src/inbox-channel.test.ts b/packages/services/service-messaging/src/inbox-channel.test.ts index 54eb2e004..2f1560780 100644 --- a/packages/services/service-messaging/src/inbox-channel.test.ts +++ b/packages/services/service-messaging/src/inbox-channel.test.ts @@ -158,51 +158,14 @@ describe('inbox channel', () => { expect(ch.classifyError?.(new Error('x'))).toBe('retryable'); }); - // ── email → user id resolution (notify-by-email lands in the right inbox) ── - - it('resolves an email-shaped recipient to its sys_user id', async () => { - const data = fakeData(undefined, (obj, _q) => - obj === 'sys_user' ? { id: 'usr_abc123' } : null, - ); - const ch = createInboxChannel({ getData: () => data.engine }); - - await ch.send(silentCtx(), delivery({}, 'ada@example.com')); - - expect(data.findOnes).toHaveLength(1); - expect(data.findOnes[0].object).toBe('sys_user'); - expect(data.findOnes[0].query).toEqual({ where: { email: 'ada@example.com' }, fields: ['id'] }); - expect(data.inserts[0].row.user_id).toBe('usr_abc123'); - }); - - it('honours a userObject override for resolution', async () => { - const data = fakeData(undefined, () => ({ id: 'usr_xyz' })); - const ch = createInboxChannel({ getData: () => data.engine, userObject: 'crm_contact' }); - await ch.send(silentCtx(), delivery({}, 'ada@example.com')); - expect(data.findOnes[0].object).toBe('crm_contact'); - expect(data.inserts[0].row.user_id).toBe('usr_xyz'); - }); - - it('keys by the recipient verbatim when it is not email-shaped (no lookup)', async () => { + // Recipients arrive pre-resolved to user ids (RecipientResolver, ADR-0030 + // P1) — the channel keys the row by `recipient` verbatim and does NOT do + // its own identity lookup. + it('keys the inbox row by the recipient verbatim, with no user lookup', async () => { const data = fakeData(); const ch = createInboxChannel({ getData: () => data.engine }); await ch.send(silentCtx(), delivery({}, 'usr_42')); expect(data.findOnes).toHaveLength(0); expect(data.inserts[0].row.user_id).toBe('usr_42'); }); - - it('falls back to the email verbatim when no user matches', async () => { - const data = fakeData(undefined, () => null); - const ch = createInboxChannel({ getData: () => data.engine }); - await ch.send(silentCtx(), delivery({}, 'ghost@example.com')); - expect(data.findOnes).toHaveLength(1); - expect(data.inserts[0].row.user_id).toBe('ghost@example.com'); - }); - - it('falls back to the email verbatim when the lookup throws', async () => { - const data = fakeData(undefined, () => { throw new Error('user table locked'); }); - const ch = createInboxChannel({ getData: () => data.engine }); - const result = await ch.send(silentCtx(), delivery({}, 'ada@example.com')); - expect(result.ok).toBe(true); - expect(data.inserts[0].row.user_id).toBe('ada@example.com'); - }); }); diff --git a/packages/services/service-messaging/src/inbox-channel.ts b/packages/services/service-messaging/src/inbox-channel.ts index 57c64f32a..7b050c36d 100644 --- a/packages/services/service-messaging/src/inbox-channel.ts +++ b/packages/services/service-messaging/src/inbox-channel.ts @@ -15,12 +15,6 @@ export const INBOX_OBJECT = 'sys_inbox_message'; /** The receipt object the inbox channel writes a `delivered` row to (ADR-0030). */ export const RECEIPT_OBJECT = 'sys_notification_receipt'; -/** The user identity object an email-shaped recipient is resolved against. */ -export const USER_OBJECT = 'sys_user'; - -/** Cheap RFC-ish heuristic — "looks like an email" so we attempt id resolution. */ -const EMAIL_SHAPE = /^[^\s@]+@[^\s@]+\.[^\s@]+$/; - export interface InboxChannelOptions { /** * Resolve the runtime data engine. Returns `undefined` when no data layer @@ -33,15 +27,6 @@ export interface InboxChannelOptions { objectName?: string; /** Receipt object name override (default {@link RECEIPT_OBJECT}). */ receiptObject?: string; - /** - * User identity object used to resolve an email-shaped recipient to its - * id (default {@link USER_OBJECT}). The inbox is keyed by user id, but - * flows commonly address recipients by email (e.g. an `assignee` field), - * so a recipient matching {@link EMAIL_SHAPE} is looked up by `email` and - * replaced with the matching user's `id`. Verbatim fallback applies when - * the recipient is not email-shaped, no user matches, or the lookup fails. - */ - userObject?: string; /** Clock injection for deterministic tests. Defaults to `new Date()`. */ now?(): string; } @@ -53,43 +38,16 @@ export interface InboxChannelOptions { * call — we write a `sys_inbox_message` row in our own DB and the user's client * pulls it. So it needs no connector/transport. One delivery → one row keyed by * the recipient user id. + * + * Recipients arrive already resolved to user ids by the `RecipientResolver` + * (ADR-0030 P1) — the email→id fallback that used to live here moved up to the + * single resolution home, so the channel just keys the row by `recipient`. */ export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel { const objectName = opts.objectName ?? INBOX_OBJECT; const receiptObject = opts.receiptObject ?? RECEIPT_OBJECT; - const userObject = opts.userObject ?? USER_OBJECT; const now = opts.now ?? (() => new Date().toISOString()); - /** - * Map an email-shaped recipient to its user id; return the recipient - * unchanged for non-email recipients, on no match, or on any lookup error - * (the inbox row is best-effort keyed and must never fail on resolution). - */ - async function resolveRecipient( - ctx: MessagingChannelContext, - data: IDataEngine, - recipient: string, - ): Promise { - if (!EMAIL_SHAPE.test(recipient)) return recipient; - try { - const user = await data.findOne(userObject, { - where: { email: recipient }, - fields: ['id'], - }); - const id = user?.id; - if (id != null && String(id).length > 0) return String(id); - ctx.logger.warn( - `[inbox] no '${userObject}' matched email '${recipient}'; keying inbox row by the email verbatim`, - ); - return recipient; - } catch (err) { - ctx.logger.warn( - `[inbox] failed to resolve '${recipient}' to a user id (${(err as Error).message}); keying by the email verbatim`, - ); - return recipient; - } - } - /** * Write the `delivered` receipt for an inbox materialization. Best-effort: * receipts are the read-state spine but a failure here must never turn a @@ -134,7 +92,7 @@ export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel return { ok: true }; } - const userId = await resolveRecipient(ctx, data, delivery.recipient); + const userId = delivery.recipient; const at = now(); const row: Record = { diff --git a/packages/services/service-messaging/src/index.ts b/packages/services/service-messaging/src/index.ts index 1b7c169bd..9b857ea23 100644 --- a/packages/services/service-messaging/src/index.ts +++ b/packages/services/service-messaging/src/index.ts @@ -29,6 +29,19 @@ export type { MessagingServiceContext, } from './messaging-service.js'; +// Recipient resolution (ADR-0030 P1) +export { + RecipientResolver, + USER_OBJECT, + MEMBER_OBJECT, + TEAM_MEMBER_OBJECT, +} from './recipient-resolver.js'; +export type { + RecipientResolverOptions, + RecipientResolverLogger, + ResolveContext, +} from './recipient-resolver.js'; + // Channel seam export { createInboxChannel, INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js'; export type { InboxChannelOptions } from './inbox-channel.js'; @@ -41,5 +54,27 @@ export type { ErrorClass, } from './channel.js'; +// Reliable delivery — outbox + dispatcher (ADR-0030 P1) +export type { + INotificationOutbox, + NotificationDeliveryRecord, + DeliveryStatus, + DeliveryPayload, + EnqueueDeliveryInput, + ClaimOptions, + AckResult, +} from './outbox.js'; +export { SqlNotificationOutbox, DELIVERY_OBJECT } from './sql-outbox.js'; +export type { SqlNotificationOutboxOptions } from './sql-outbox.js'; +export { MemoryNotificationOutbox } from './memory-outbox.js'; +export { hashPartition, nextRetryDelayMs, classifyDeliveryAttempt } from './backoff.js'; +export { NotificationDispatcher } from './dispatcher.js'; +export type { + NotificationDispatcherOptions, + ChannelRegistry, + DispatchCluster, + DispatchLockHandle, +} from './dispatcher.js'; + // Objects (metadata definitions) -export { InboxMessage, NotificationReceipt } from './objects/index.js'; +export { InboxMessage, NotificationReceipt, NotificationDelivery } from './objects/index.js'; diff --git a/packages/services/service-messaging/src/memory-outbox.ts b/packages/services/service-messaging/src/memory-outbox.ts new file mode 100644 index 000000000..be6e7f096 --- /dev/null +++ b/packages/services/service-messaging/src/memory-outbox.ts @@ -0,0 +1,115 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { randomUUID } from 'node:crypto'; +import type { + AckResult, + ClaimOptions, + DeliveryStatus, + EnqueueDeliveryInput, + INotificationOutbox, + NotificationDeliveryRecord, +} from './outbox.js'; +import { hashPartition } from './backoff.js'; + +/** + * In-memory {@link INotificationOutbox} — the test/minimal-stack backend. + * Single-process, so `claim()` is trivially atomic (no DB round-trips). Same + * semantics as {@link SqlNotificationOutbox} for dedup, reaping, and acks. + */ +export class MemoryNotificationOutbox implements INotificationOutbox { + private readonly rows = new Map(); + + constructor( + private readonly partitionCount = 8, + /** Injectable clock (ms) for deterministic tests. Defaults to Date.now. */ + private readonly clock: () => number = () => Date.now(), + ) {} + + async enqueue(input: EnqueueDeliveryInput): Promise { + for (const r of this.rows.values()) { + if ( + r.notificationId === input.notificationId && + r.recipientId === input.recipientId && + r.channel === input.channel + ) { + return r.id; // dedup + } + } + const id = randomUUID(); + const now = this.clock(); + this.rows.set(id, { + id, + notificationId: input.notificationId, + recipientId: input.recipientId, + channel: input.channel, + topic: input.topic, + payload: input.payload ?? {}, + organizationId: input.organizationId, + partitionKey: hashPartition(input.notificationId, this.partitionCount), + status: 'pending', + attempts: 0, + createdAt: now, + updatedAt: now, + }); + return id; + } + + async claim(opts: ClaimOptions): Promise { + const now = opts.now ?? this.clock(); + // Reap stale in_flight. + for (const r of this.rows.values()) { + if (r.status === 'in_flight' && (r.claimedAt ?? 0) < now - opts.claimTtlMs) { + r.status = 'pending'; + r.claimedBy = undefined; + r.claimedAt = undefined; + r.updatedAt = now; + } + } + const out: NotificationDeliveryRecord[] = []; + for (const r of this.rows.values()) { + if (out.length >= opts.limit) break; + if (r.status !== 'pending') continue; + if (opts.partition && r.partitionKey !== opts.partition.index) continue; + if (r.nextAttemptAt != null && r.nextAttemptAt > now) continue; + r.status = 'in_flight'; + r.claimedBy = opts.nodeId; + r.claimedAt = now; + r.updatedAt = now; + out.push({ ...r }); + } + return out; + } + + async ack(id: string, result: AckResult): Promise { + const r = this.rows.get(id); + if (!r) return; + const now = this.clock(); + r.attempts += 1; + r.lastAttemptedAt = now; + r.claimedBy = undefined; + r.claimedAt = undefined; + r.updatedAt = now; + if (result.success) { + r.status = 'success'; + r.nextAttemptAt = undefined; + r.error = undefined; + } else if (result.suppressed) { + r.status = 'suppressed'; + r.error = result.error; + } else if (result.dead) { + r.status = 'dead'; + r.error = result.error; + } else { + r.status = 'pending'; + r.nextAttemptAt = result.nextAttemptAt; + r.error = result.error; + } + } + + async list(filter?: { status?: DeliveryStatus; notificationId?: string }): Promise { + let rows = [...this.rows.values()]; + if (filter?.status) rows = rows.filter((r) => r.status === filter.status); + if (filter?.notificationId) rows = rows.filter((r) => r.notificationId === filter.notificationId); + return rows.map((r) => ({ ...r })); + } +} diff --git a/packages/services/service-messaging/src/messaging-service-plugin.ts b/packages/services/service-messaging/src/messaging-service-plugin.ts index 1208d9edb..af09cc32b 100644 --- a/packages/services/service-messaging/src/messaging-service-plugin.ts +++ b/packages/services/service-messaging/src/messaging-service-plugin.ts @@ -1,10 +1,13 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. +import { randomUUID } from 'node:crypto'; import type { Plugin, PluginContext } from '@objectstack/core'; import type { IDataEngine } from '@objectstack/spec/contracts'; import { MessagingService } from './messaging-service.js'; import { createInboxChannel } from './inbox-channel.js'; -import { InboxMessage, NotificationReceipt } from './objects/index.js'; +import { SqlNotificationOutbox } from './sql-outbox.js'; +import { NotificationDispatcher, type DispatchCluster } from './dispatcher.js'; +import { InboxMessage, NotificationReceipt, NotificationDelivery } from './objects/index.js'; export interface MessagingServicePluginOptions { /** @@ -12,11 +15,21 @@ export interface MessagingServicePluginOptions { * Set `false` only for tests that want an empty registry. */ registerInbox?: boolean; + /** + * Run the durable delivery outbox + dispatcher (ADR-0030 P1) when a data + * engine is available (default `true`). When off (or no engine), `emit()` + * fans out inline best-effort (P0 behavior). + */ + reliableDelivery?: boolean; + /** Outbox/dispatcher partition count (default 8). */ + partitionCount?: number; + /** Dispatcher tick interval in ms (default 500). */ + dispatchIntervalMs?: number; } /** - * MessagingServicePlugin — registers the `messaging` service (ADR-0012 M1, - * minimal slice). + * MessagingServicePlugin — registers the `messaging` service (ADR-0012 / + * ADR-0030). * * After bootstrap, `kernel.getService('messaging')` is a {@link MessagingService} * with the always-on `inbox` channel registered. The baseline `notify` flow @@ -24,6 +37,11 @@ export interface MessagingServicePluginOptions { * plugin is installed. Other channels (email/webhook/push/IM) register * themselves on this same service. * + * At `kernel:ready` (engine available) the plugin wires the reliable-delivery + * path: a `SqlNotificationOutbox` over `sys_notification_delivery` plus a + * `NotificationDispatcher` that drains it with retry/backoff/dead-letter. + * `emit()` then enqueues durable deliveries instead of fanning out inline. + * * @example * ```ts * const kernel = new ObjectKernel(); @@ -38,23 +56,27 @@ export class MessagingServicePlugin implements Plugin { type = 'standard' as const; dependencies = ['com.objectstack.engine.objectql']; - private readonly options: MessagingServicePluginOptions; + private readonly options: Required; + private dispatcher?: NotificationDispatcher; constructor(options: MessagingServicePluginOptions = {}) { - this.options = { registerInbox: true, ...options }; + this.options = { + registerInbox: true, + reliableDelivery: true, + partitionCount: 8, + dispatchIntervalMs: 500, + ...options, + }; } async init(ctx: PluginContext): Promise { - // Shared lazy data-engine resolver — used both to persist the L2 - // `sys_notification` event in `emit()` and by the inbox channel to - // materialize rows. Resolved lazily so it works regardless of plugin - // init order. + // Shared lazy data-engine resolver — used to persist the L2 + // `sys_notification` event in `emit()`, by the inbox channel to + // materialize rows, and (at kernel:ready) to back the outbox. Resolved + // lazily so it works regardless of plugin init order. const getData = (): IDataEngine | undefined => { try { - return ( - ctx.getService('data') ?? - ctx.getService('objectql') - ); + return ctx.getService('data') ?? ctx.getService('objectql'); } catch { return undefined; } @@ -68,18 +90,60 @@ export class MessagingServicePlugin implements Plugin { ctx.registerService('messaging', service); - // Register the inbox object so `sys_inbox_message` rows can be written. + // Register the messaging objects so their rows can be written. ctx.getService<{ register(m: unknown): void }>('manifest').register({ id: 'com.objectstack.service.messaging', name: 'Messaging Service', version: '1.0.0', type: 'plugin', scope: 'system', - objects: [InboxMessage, NotificationReceipt], + objects: [InboxMessage, NotificationReceipt, NotificationDelivery], }); + // Reliable delivery (P1): wire the outbox + dispatcher once the engine + // is resolvable. Until then `emit()` runs inline best-effort. + if (this.options.reliableDelivery && typeof ctx.hook === 'function') { + ctx.hook('kernel:ready', async () => { + const engine = getData(); + if (!engine) { + ctx.logger.warn('[messaging] no data engine at kernel:ready — reliable delivery disabled (inline fan-out)'); + return; + } + const outbox = new SqlNotificationOutbox(engine, { partitionCount: this.options.partitionCount }); + service.setOutbox(outbox); + + let cluster: DispatchCluster | undefined; + try { + cluster = ctx.getService('cluster'); + } catch { + cluster = undefined; // single-node fallback in the dispatcher + } + + this.dispatcher = new NotificationDispatcher({ + nodeId: `notify-${process.pid}-${randomUUID().slice(0, 8)}`, + outbox, + channels: service, + channelContext: { logger: ctx.logger }, + cluster, + partitionCount: this.options.partitionCount, + intervalMs: this.options.dispatchIntervalMs, + logger: ctx.logger, + }); + this.dispatcher.start(); + ctx.logger.info( + `[messaging] reliable delivery on (outbox + dispatcher, ${this.options.partitionCount} partitions${cluster ? ', clustered' : ', single-node'})`, + ); + }); + } + ctx.logger.info( `[messaging] service registered with channels: ${service.getRegisteredChannels().join(', ') || '(none)'}`, ); } + + /** Stop the dispatcher loop on shutdown. */ + async stop(): Promise { + await this.dispatcher?.stop(); + this.dispatcher = undefined; + } } diff --git a/packages/services/service-messaging/src/messaging-service.test.ts b/packages/services/service-messaging/src/messaging-service.test.ts index e03f1a284..a50e68de6 100644 --- a/packages/services/service-messaging/src/messaging-service.test.ts +++ b/packages/services/service-messaging/src/messaging-service.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { MessagingService } from './messaging-service.js'; +import { MemoryNotificationOutbox } from './memory-outbox.js'; import type { Delivery, MessagingChannel, SendResult } from './channel.js'; function silentLogger() { @@ -119,7 +120,10 @@ describe('MessagingService', () => { expect(inbox.seen.map((d) => d.recipient)).toEqual(['user_1']); }); - it('skips deferred role:/team:/owner_of: selectors until P1 (no recipients)', async () => { + it('resolves role:/team:/owner_of: to 0 recipients when no directory (data) is present', async () => { + // Without a data engine the RecipientResolver can't query membership, + // so these selectors yield no recipients (rather than throwing). + // Directory-backed expansion is covered in recipient-resolver.test.ts. const inbox = recordingChannel('inbox'); service.registerChannel(inbox.channel); const result = await service.emit({ @@ -132,6 +136,39 @@ describe('MessagingService', () => { expect(result.failed).toBe(0); }); + it('resolves role:/team:/owner_of: through the data engine when present', async () => { + const engine = { + async insert(_o: string, row: any) { return { id: 'evt_x', ...row }; }, + async find(object: string) { + if (object === 'sys_member') return [{ user_id: 'u_admin1' }, { user_id: 'u_admin2' }]; + if (object === 'sys_team_member') return [{ user_id: 'u_sales' }]; + return []; + }, + async findOne(object: string) { + return object === 'lead' ? { id: 'l1', owner_id: 'u_owner' } : null; + }, + async update() { return {}; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any; + service = new MessagingService({ logger: silentLogger(), getData: () => engine }); + const inbox = recordingChannel('inbox'); + service.registerChannel(inbox.channel); + + const result = await service.emit({ + topic: 't', + audience: ['role:admin', 'team:sales', { ownerOf: { object: 'lead', id: 'l1' } }, 'u_admin1'], + payload: { title: 'Hi' }, + }); + + // u_admin1 de-duped against the role expansion; owner resolved from the record. + expect(inbox.seen.map((d) => d.recipient).sort()).toEqual( + ['u_admin1', 'u_admin2', 'u_owner', 'u_sales'].sort(), + ); + expect(result.delivered).toBe(4); + }); + it('fans out across every requested channel', async () => { const inbox = recordingChannel('inbox'); const email = recordingChannel('email'); @@ -182,6 +219,30 @@ describe('MessagingService', () => { }); }); + describe('emit() with a delivery outbox (P1)', () => { + it('enqueues a pending delivery per (recipient × channel) instead of fanning out inline', async () => { + const outbox = new MemoryNotificationOutbox(1); + const inbox = recordingChannel('inbox'); + service = new MessagingService({ logger: silentLogger(), outbox }); + service.registerChannel(inbox.channel); + + const result = await service.emit({ + topic: 'deal.won', + audience: ['user_1', 'user_2'], + payload: { title: 'Deal closed', body: 'Acme' }, + }); + + // Nothing sent inline — the dispatcher owns the send. + expect(inbox.seen).toHaveLength(0); + expect(result.delivered).toBe(2); // 2 enqueued (accepted) + const rows = await outbox.list(); + expect(rows).toHaveLength(2); + expect(rows.every((r) => r.status === 'pending')).toBe(true); + expect(rows[0].payload).toMatchObject({ title: 'Deal closed', body: 'Acme', severity: 'info' }); + expect(rows.map((r) => r.recipientId).sort()).toEqual(['user_1', 'user_2']); + }); + }); + describe('emit() L2 event persistence', () => { it('writes one sys_notification event row carrying topic/payload/severity/source/actor', async () => { const data = fakeData(); diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index 377c27955..a73a034eb 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -6,6 +6,8 @@ import type { MessagingChannelContext, Notification, } from './channel.js'; +import { RecipientResolver } from './recipient-resolver.js'; +import type { INotificationOutbox } from './outbox.js'; /** The L2 event object every `emit()` writes one row to (ADR-0030). */ export const NOTIFICATION_EVENT_OBJECT = 'sys_notification'; @@ -80,11 +82,17 @@ export interface MessagingServiceContext extends MessagingChannelContext { getData?(): IDataEngine | undefined; /** Clock injection for deterministic tests. Defaults to `new Date()`. */ now?(): string; + /** Override the recipient resolver (tests). Defaults to a data-backed one. */ + recipientResolver?: RecipientResolver; + /** + * Durable delivery outbox (ADR-0030 P1). When present, `emit()` enqueues a + * `pending` delivery row per `(recipient × channel)` and the + * `NotificationDispatcher` performs the send + retries. When absent, `emit()` + * fans out inline (best-effort, no retry) — the P0 behavior. + */ + outbox?: INotificationOutbox; } -/** Selector prefixes the inline resolver forwards verbatim (expanded in P1). */ -const DEFERRED_SELECTOR = /^(role:|team:|owner_of:)/; - /** * MessagingService — the notification dispatcher (ADR-0012 / ADR-0030). * @@ -105,9 +113,24 @@ const DEFERRED_SELECTOR = /^(role:|team:|owner_of:)/; export class MessagingService { private readonly channels = new Map(); private readonly now: () => string; + private readonly resolver: RecipientResolver; + private outbox?: INotificationOutbox; constructor(private readonly ctx: MessagingServiceContext) { this.now = ctx.now ?? (() => new Date().toISOString()); + this.resolver = + ctx.recipientResolver ?? + new RecipientResolver({ getData: () => ctx.getData?.(), logger: ctx.logger }); + this.outbox = ctx.outbox; + } + + /** + * Attach the durable delivery outbox after construction. The plugin wires + * this once the data engine is resolvable (kernel:ready), switching `emit()` + * from inline fan-out to the reliable enqueue → dispatcher path. + */ + setOutbox(outbox: INotificationOutbox): void { + this.outbox = outbox; } /** Register a channel implementation. A duplicate id warns and replaces. */ @@ -159,15 +182,33 @@ export class MessagingService { // 2) Write the L2 event (or synthesize an id when there is no data layer). const notificationId = await this.writeEvent(data, input); - // 3) Resolve the audience to recipients. - const recipients = this.resolveRecipients(input.audience); + // 3) Resolve the audience to recipient user ids (RecipientResolver owns + // role:/team:/owner_of:/email→id expansion). + const recipients = await this.resolver.resolve(input.audience, { + organizationId: input.organizationId, + }); if (recipients.length === 0) { this.ctx.logger.warn(`[messaging] emit: topic '${input.topic}' resolved to 0 recipients`); return { notificationId, deduped: false, deliveries: [], delivered: 0, failed: 0 }; } - // 4) Derive the per-recipient notification and fan out to channels. + // 4) Either enqueue durable deliveries (P1 outbox) or fan out inline (P0). const payload = input.payload ?? {}; + const channels = input.channels?.length ? input.channels : ['inbox']; + + if (this.outbox) { + const deliveries = await this.enqueueDeliveries( + this.outbox, + notificationId, + recipients, + channels, + input, + payload, + ); + const delivered = deliveries.filter((d) => d.ok).length; + return { notificationId, deduped: false, deliveries, delivered, failed: deliveries.length - delivered }; + } + const notification: Notification = { notificationId, organizationId: input.organizationId, @@ -185,6 +226,50 @@ export class MessagingService { return { notificationId, deduped: false, deliveries, delivered, failed }; } + /** + * Enqueue one `pending` delivery row per `(channel × recipient)`. The + * dispatcher does the actual send + retry; here `ok` means "accepted for + * delivery" (enqueued), not yet delivered — progress is observable on the + * `sys_notification_delivery` row. + */ + private async enqueueDeliveries( + outbox: INotificationOutbox, + notificationId: string, + recipients: string[], + channels: string[], + input: EmitInput, + payload: Record, + ): Promise { + // Snapshot the rendered content onto each delivery so a later event edit + // can't rewrite an in-flight send. + const deliveryPayload = { + ...payload, + title: str(payload.title) ?? input.topic, + body: str(payload.body) ?? '', + severity: input.severity ?? 'info', + actionUrl: str(payload.url) ?? str(payload.actionUrl), + }; + const deliveries: DeliveryOutcome[] = []; + for (const channel of channels) { + for (const recipient of recipients) { + try { + const id = await outbox.enqueue({ + notificationId, + recipientId: recipient, + channel, + topic: input.topic, + payload: deliveryPayload, + organizationId: input.organizationId, + }); + deliveries.push({ channel, recipient, ok: true, externalId: id }); + } catch (err) { + deliveries.push({ channel, recipient, ok: false, error: (err as Error)?.message ?? String(err) }); + } + } + } + return deliveries; + } + /** Find an existing event id by its dedup key, tolerating lookup failure. */ private async findEventByDedupKey(data: IDataEngine, dedupKey: string): Promise { try { @@ -228,32 +313,6 @@ export class MessagingService { return id != null ? String(id) : `evt_${Math.random().toString(36).slice(2)}`; } - /** - * Expand an audience to a flat recipient list. P0 keeps explicit ids and - * email-shaped entries (email→id finishes at the inbox channel); deferred - * `role:`/`team:`/`owner_of:` selectors warn and are dropped until the P1 - * `RecipientResolver` lands. - */ - private resolveRecipients(audience: Audience): string[] { - const specs = Array.isArray(audience) ? audience : [audience as AudienceSpec]; - const out: string[] = []; - for (const spec of specs) { - if (typeof spec !== 'string') { - this.ctx.logger.warn( - `[messaging] owner_of: audience resolution lands in P1; '${JSON.stringify(spec)}' skipped`, - ); - continue; - } - if (DEFERRED_SELECTOR.test(spec)) { - this.ctx.logger.warn(`[messaging] audience selector '${spec}' resolution lands in P1; skipped`); - continue; - } - if (spec.trim()) out.push(spec.trim()); - } - // De-dup while preserving order. - return [...new Set(out)]; - } - /** * Fan a notification out to its channels and recipients. Each * `(channel, recipient)` pair becomes one `send()` call. An unregistered diff --git a/packages/services/service-messaging/src/objects/index.ts b/packages/services/service-messaging/src/objects/index.ts index b4d8882cc..4605e406b 100644 --- a/packages/services/service-messaging/src/objects/index.ts +++ b/packages/services/service-messaging/src/objects/index.ts @@ -2,3 +2,4 @@ export { InboxMessage } from './inbox-message.object.js'; export { NotificationReceipt } from './notification-receipt.object.js'; +export { NotificationDelivery } from './notification-delivery.object.js'; diff --git a/packages/services/service-messaging/src/objects/notification-delivery.object.ts b/packages/services/service-messaging/src/objects/notification-delivery.object.ts new file mode 100644 index 000000000..c81a9cf88 --- /dev/null +++ b/packages/services/service-messaging/src/objects/notification-delivery.object.ts @@ -0,0 +1,75 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { ObjectSchema, Field } from '@objectstack/spec/data'; + +/** + * `sys_notification_delivery` — the durable outbox (ADR-0030 Layer 4). + * + * One row per `(event × recipient × channel)`. The spine of reliable delivery: + * `emit()` writes rows in `pending`; the `NotificationDispatcher` claims them + * (`pending → in_flight`), sends via the channel, and acks the outcome + * (`success` / back to `pending` with a `next_attempt_at` for retry / `dead` + * once the retry budget is exhausted). Mirrors `sys_webhook_delivery`. + * + * `payload` snapshots the rendered notification content at enqueue time so a + * later edit of the L2 event can't rewrite an in-flight delivery (and so the + * dispatcher needs no second read to send). Timestamps are epoch ms. + */ +export const NotificationDelivery = ObjectSchema.create({ + name: 'sys_notification_delivery', + label: 'Notification Delivery', + pluralLabel: 'Notification Deliveries', + icon: 'send', + isSystem: true, + managedBy: 'system', + description: 'Durable per-recipient × channel delivery outbox (ADR-0030 Layer 4).', + titleFormat: '{channel} → {recipient_id}', + compactLayout: ['notification_id', 'recipient_id', 'channel', 'status', 'attempts'], + + fields: { + id: Field.text({ label: 'Delivery ID', required: true, readonly: true }), + + notification_id: Field.text({ + label: 'Notification Event', + required: true, + searchable: true, + description: 'FK → sys_notification (L2 event)', + }), + recipient_id: Field.text({ label: 'Recipient User', required: true, searchable: true }), + channel: Field.text({ label: 'Channel', required: true }), + topic: Field.text({ label: 'Topic', searchable: true }), + + payload: Field.json({ + label: 'Payload', + description: 'Snapshot of the rendered notification content for dispatch.', + }), + + status: Field.select(['pending', 'in_flight', 'success', 'failed', 'dead', 'suppressed'], { + label: 'Status', + required: true, + defaultValue: 'pending', + }), + + attempts: Field.number({ label: 'Attempts', defaultValue: 0 }), + partition_key: Field.number({ label: 'Partition Key', defaultValue: 0 }), + + claimed_by: Field.text({ label: 'Claimed By', description: 'Node id while in_flight' }), + claimed_at: Field.number({ label: 'Claimed At (ms)' }), + next_attempt_at: Field.number({ label: 'Next Attempt At (ms)' }), + last_attempted_at: Field.number({ label: 'Last Attempted At (ms)' }), + error: Field.textarea({ label: 'Error' }), + + created_at: Field.number({ label: 'Created At (ms)', readonly: true }), + updated_at: Field.number({ label: 'Updated At (ms)' }), + }, + + indexes: [ + // Dedup: one delivery per (event, recipient, channel). + { fields: ['notification_id', 'recipient_id', 'channel'], unique: true }, + // The hot claim query. + { fields: ['status', 'partition_key', 'next_attempt_at'] }, + // Stale-in_flight reaper. + { fields: ['status', 'claimed_at'] }, + { fields: ['notification_id'] }, + ], +}); diff --git a/packages/services/service-messaging/src/outbox.ts b/packages/services/service-messaging/src/outbox.ts new file mode 100644 index 000000000..5a396c10a --- /dev/null +++ b/packages/services/service-messaging/src/outbox.ts @@ -0,0 +1,100 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * Notification delivery outbox contracts (ADR-0030 Layer 4). + * + * The outbox stores `(event × recipient × channel)` delivery rows that must be + * sent reliably (at-least-once + receiver-side idempotency on the materialized + * artifact). Implementations are pluggable so the same dispatcher runs against + * an in-memory test store or the SQL-backed `sys_notification_delivery` table. + * Mirrors the proven `plugin-webhooks` outbox. + */ + +export type DeliveryStatus = + | 'pending' + | 'in_flight' + | 'success' + | 'failed' + | 'dead' + | 'suppressed'; + +/** Rendered content snapshot carried on the delivery row for dispatch. */ +export interface DeliveryPayload { + title?: string; + body?: string; + severity?: 'info' | 'warning' | 'critical' | string; + actionUrl?: string; + [k: string]: unknown; +} + +export interface NotificationDeliveryRecord { + id: string; + notificationId: string; + recipientId: string; + channel: string; + topic?: string; + payload: DeliveryPayload; + organizationId?: string; + partitionKey: number; + status: DeliveryStatus; + attempts: number; + claimedBy?: string; + claimedAt?: number; + nextAttemptAt?: number; + lastAttemptedAt?: number; + error?: string; + createdAt: number; + updatedAt: number; +} + +export interface EnqueueDeliveryInput { + notificationId: string; + recipientId: string; + channel: string; + topic?: string; + payload: DeliveryPayload; + organizationId?: string; +} + +export interface ClaimOptions { + nodeId: string; + limit: number; + /** Only claim rows whose `hash(notificationId) mod count === index`. */ + partition?: { index: number; count: number }; + /** Visibility timeout — claimed rows revert to pending after this many ms. */ + claimTtlMs: number; + /** "Now" reference, ms. Defaults to Date.now(). */ + now?: number; +} + +export interface AckSuccess { + success: true; + durationMs?: number; +} + +export interface AckFailure { + success: false; + error?: string; + durationMs?: number; + /** Computed by the dispatcher per the retry schedule, or undefined for dead. */ + nextAttemptAt?: number; + /** Marks the row terminal `dead` — retry budget exhausted / permanent error. */ + dead?: boolean; + /** Marks the row terminal `suppressed` — intentionally not delivered. */ + suppressed?: boolean; +} + +export type AckResult = AckSuccess | AckFailure; + +/** + * Pluggable storage for delivery rows. `claim()` MUST be atomic across + * concurrent callers (the at-least-once guarantee), and `enqueue()` MUST treat + * `(notificationId, recipientId, channel)` as unique (silently returning the + * existing id on a duplicate) so a repeated `emit` can't double-deliver. + */ +export interface INotificationOutbox { + enqueue(input: EnqueueDeliveryInput): Promise; + claim(opts: ClaimOptions): Promise; + ack(id: string, result: AckResult): Promise; + list(filter?: { status?: DeliveryStatus; notificationId?: string }): Promise; +} diff --git a/packages/services/service-messaging/src/recipient-resolver.test.ts b/packages/services/service-messaging/src/recipient-resolver.test.ts new file mode 100644 index 000000000..ea7d79fec --- /dev/null +++ b/packages/services/service-messaging/src/recipient-resolver.test.ts @@ -0,0 +1,151 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { RecipientResolver } from './recipient-resolver.js'; + +function silentLogger() { + return { warn: () => {}, info: () => {} }; +} + +/** + * Fake data engine. `findOne` answers email→id and owner_of record reads; + * `find` answers role:/team: membership queries. Both keyed by object name. + */ +function fakeData(opts: { + users?: Record; // email → id + members?: Record; // role → user ids + teams?: Record; // team id → user ids + records?: Record>; // `${object}:${id}` → row + throwOn?: string; // object name to throw for +} = {}) { + const calls: Array<{ method: string; object: string; query: any }> = []; + return { + calls, + engine: { + async find(object: string, query: any) { + calls.push({ method: 'find', object, query }); + if (object === opts.throwOn) throw new Error('locked'); + if (object === 'sys_member') { + const role = query?.where?.role; + return (opts.members?.[role] ?? []).map((id) => ({ user_id: id })); + } + if (object === 'sys_team_member') { + const team = query?.where?.team_id; + return (opts.teams?.[team] ?? []).map((id) => ({ user_id: id })); + } + return []; + }, + async findOne(object: string, query: any) { + calls.push({ method: 'findOne', object, query }); + if (object === opts.throwOn) throw new Error('locked'); + if (object === 'sys_user') { + const email = query?.where?.email; + const id = opts.users?.[email]; + return id ? { id } : null; + } + const rid = query?.where?.id; + return opts.records?.[`${object}:${rid}`] ?? null; + }, + async insert() { return {}; }, + async update() { return {}; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any, + }; +} + +function resolver(data?: ReturnType) { + return new RecipientResolver({ getData: () => data?.engine, logger: silentLogger() }); +} + +describe('RecipientResolver', () => { + it('passes bare user ids through and de-dups while preserving order', async () => { + const r = resolver(fakeData()); + expect(await r.resolve(['u1', 'u2', 'u1'])).toEqual(['u1', 'u2']); + }); + + it('accepts a single (non-array) spec', async () => { + expect(await resolver(fakeData()).resolve('u9')).toEqual(['u9']); + }); + + it('strips the user: prefix', async () => { + expect(await resolver(fakeData()).resolve(['user:u5'])).toEqual(['u5']); + }); + + it('resolves an email to its sys_user id', async () => { + const data = fakeData({ users: { 'ada@example.com': 'usr_ada' } }); + expect(await resolver(data).resolve(['ada@example.com'])).toEqual(['usr_ada']); + expect(data.calls[0]).toMatchObject({ method: 'findOne', object: 'sys_user' }); + }); + + it('keeps an email verbatim when no user matches', async () => { + const data = fakeData({ users: {} }); + expect(await resolver(data).resolve(['ghost@example.com'])).toEqual(['ghost@example.com']); + }); + + it('treats an @-containing token without a domain dot as a bare id (no email lookup)', async () => { + const data = fakeData(); + // 'svc@queue' has no domain dot → not email-shaped → passed through verbatim. + expect(await resolver(data).resolve(['svc@queue'])).toEqual(['svc@queue']); + expect(data.calls.filter((c) => c.object === 'sys_user')).toHaveLength(0); + }); + + it('does not treat whitespace-bearing tokens as emails (and stays linear on adversarial input)', async () => { + const data = fakeData(); + // The old regex backtracked polynomially on '!@' + '!.'×N; the linear + // heuristic returns promptly. No sys_user lookup for a whitespace token. + const adversarial = `a@${'!.'.repeat(5000)}`; + const started = Date.now(); + await resolver(data).resolve([adversarial, 'a b@x.com']); + expect(Date.now() - started).toBeLessThan(1000); + expect(data.calls.filter((c) => c.object === 'sys_user' && c.query?.where?.email === 'a b@x.com')).toHaveLength(0); + }); + + it('expands role: via sys_member (tenant-scoped) and de-dups', async () => { + const data = fakeData({ members: { admin: ['a', 'b', 'a'] } }); + const out = await resolver(data).resolve(['role:admin'], { organizationId: 'org_1' }); + expect(out).toEqual(['a', 'b']); + expect(data.calls[0]).toMatchObject({ method: 'find', object: 'sys_member' }); + expect(data.calls[0].query.where).toEqual({ role: 'admin', organization_id: 'org_1' }); + }); + + it('expands team: via sys_team_member', async () => { + const data = fakeData({ teams: { sales: ['s1', 's2'] } }); + expect(await resolver(data).resolve(['team:sales'])).toEqual(['s1', 's2']); + expect(data.calls[0].query.where).toEqual({ team_id: 'sales' }); + }); + + it('resolves owner_of: (structured form) to the record owner field', async () => { + const data = fakeData({ records: { 'lead:l1': { id: 'l1', owner_id: 'u_owner' } } }); + expect(await resolver(data).resolve([{ ownerOf: { object: 'lead', id: 'l1' } }])).toEqual(['u_owner']); + }); + + it('resolves owner_of: (string form) and tries assigned_to when owner_id absent', async () => { + const data = fakeData({ records: { 'task:t1': { id: 't1', assigned_to: 'u_assignee' } } }); + expect(await resolver(data).resolve(['owner_of:task:t1'])).toEqual(['u_assignee']); + }); + + it('mixes specs, expanding and de-duplicating across them', async () => { + const data = fakeData({ + members: { admin: ['a', 'shared'] }, + teams: { sales: ['shared', 's1'] }, + }); + const out = await resolver(data).resolve(['role:admin', 'team:sales', 'direct']); + expect(out).toEqual(['a', 'shared', 's1', 'direct']); + }); + + it('yields 0 recipients (no throw) when no data engine is present', async () => { + const r = resolver(undefined); + expect(await r.resolve(['role:admin', 'team:x', { ownerOf: { object: 'lead', id: 'l1' } }])).toEqual([]); + }); + + it('is best-effort: a failed membership query resolves to 0, not a throw', async () => { + const data = fakeData({ throwOn: 'sys_member' }); + expect(await resolver(data).resolve(['role:admin', 'u_keep'])).toEqual(['u_keep']); + }); + + it('skips a malformed owner_of string', async () => { + expect(await resolver(fakeData()).resolve(['owner_of:noseparator'])).toEqual([]); + }); +}); diff --git a/packages/services/service-messaging/src/recipient-resolver.ts b/packages/services/service-messaging/src/recipient-resolver.ts new file mode 100644 index 000000000..84a0d7aed --- /dev/null +++ b/packages/services/service-messaging/src/recipient-resolver.ts @@ -0,0 +1,220 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { IDataEngine } from '@objectstack/spec/contracts'; +import type { Audience, AudienceSpec } from './messaging-service.js'; + +/** + * Cheap "looks like an email" heuristic so we attempt id resolution. Hand-rolled + * (no regex) to stay strictly linear — the obvious `^[^\s@]+@[^\s@]+\.[^\s@]+$` + * has overlapping quantifiers (the `.` is also in `[^\s@]`) and backtracks + * polynomially on adversarial input (ReDoS). This is O(n). + */ +function looksLikeEmail(s: string): boolean { + if (!s || /\s/.test(s)) return false; // single char-class test — linear + const at = s.indexOf('@'); + if (at <= 0 || at !== s.lastIndexOf('@') || at === s.length - 1) return false; + const domain = s.slice(at + 1); + const dot = domain.indexOf('.'); + // a dot in the domain, not first or last char + return dot > 0 && dot < domain.length - 1; +} + +/** The user identity object an email-shaped recipient is resolved against. */ +export const USER_OBJECT = 'sys_user'; +/** Tenant-membership object backing `role:` expansion (`sys_member.role`). */ +export const MEMBER_OBJECT = 'sys_member'; +/** Team-membership object backing `team:` expansion (`sys_team_member.team_id`). */ +export const TEAM_MEMBER_OBJECT = 'sys_team_member'; + +/** + * Conventional owner/assignee field names tried, in order, for `owner_of:` + * audience resolution. Mirrors the audit writer's `OWNER_FIELDS`. + */ +const DEFAULT_OWNER_FIELDS = ['owner_id', 'assigned_to', 'assignee_id', 'owner', 'assignee']; + +export interface RecipientResolverLogger { + warn(...args: unknown[]): void; + info?(...args: unknown[]): void; +} + +export interface RecipientResolverOptions { + /** Resolve the runtime data engine. `undefined` on a minimal/test stack. */ + getData(): IDataEngine | undefined; + logger: RecipientResolverLogger; + /** Identity object for email→id resolution (default {@link USER_OBJECT}). */ + userObject?: string; + /** Membership object for `role:` (default {@link MEMBER_OBJECT}). */ + memberObject?: string; + /** Membership object for `team:` (default {@link TEAM_MEMBER_OBJECT}). */ + teamMemberObject?: string; + /** Owner field candidates for `owner_of:` (default {@link DEFAULT_OWNER_FIELDS}). */ + ownerFields?: string[]; +} + +export interface ResolveContext { + /** Tenant scope applied to `role:` expansion. */ + organizationId?: string; +} + +/** + * RecipientResolver (ADR-0030 P1) — expands an {@link Audience} into a flat, + * de-duplicated list of recipient **user ids**. + * + * Reuses the platform's existing identity/membership object model (the same + * `sys_member.role` / `sys_team_member.team_id` graph `plugin-sharing`'s + * `TeamGraphService` reads) by querying it directly through the data engine — + * so `service-messaging` gains no backward dependency on a plugin. This is the + * single home for recipient resolution: the inbox channel no longer does its + * own email→id fallback (that moved here per ADR-0030 P1). + * + * Supported audience specs: + * - `''` → the id verbatim + * - `'user:'` → the id (prefix stripped) + * - `''` → `sys_user` lookup by email → id (verbatim on miss) + * - `'role:'` → `sys_member` where `role = name` (tenant-scoped) + * - `'team:'` → `sys_team_member` where `team_id = id` + * - `'owner_of::'` → the owner field of that record + * - `{ ownerOf: { object, id } }` → same, structured form + * + * Every lookup is best-effort: a failed query resolves to no recipients for + * that spec (logged) rather than throwing — emit() must never fail because a + * directory lookup hiccupped. + */ +export class RecipientResolver { + private readonly userObject: string; + private readonly memberObject: string; + private readonly teamMemberObject: string; + private readonly ownerFields: string[]; + + constructor(private readonly opts: RecipientResolverOptions) { + this.userObject = opts.userObject ?? USER_OBJECT; + this.memberObject = opts.memberObject ?? MEMBER_OBJECT; + this.teamMemberObject = opts.teamMemberObject ?? TEAM_MEMBER_OBJECT; + this.ownerFields = opts.ownerFields ?? DEFAULT_OWNER_FIELDS; + } + + /** Expand an audience to a de-duplicated list of recipient user ids. */ + async resolve(audience: Audience, ctx: ResolveContext = {}): Promise { + const specs = Array.isArray(audience) ? audience : [audience as AudienceSpec]; + const data = this.opts.getData(); + const out: string[] = []; + + for (const spec of specs) { + for (const id of await this.resolveOne(spec, data, ctx)) { + if (id) out.push(id); + } + } + // De-dup while preserving order. + return [...new Set(out)]; + } + + private async resolveOne( + spec: AudienceSpec, + data: IDataEngine | undefined, + ctx: ResolveContext, + ): Promise { + if (typeof spec !== 'string') { + // Structured `{ ownerOf: { object, id } }`. + if (spec && typeof spec === 'object' && 'ownerOf' in spec) { + return this.resolveOwnerOf(spec.ownerOf.object, spec.ownerOf.id, data); + } + this.opts.logger.warn(`[recipients] unrecognized audience spec ${JSON.stringify(spec)}; skipped`); + return []; + } + + const value = spec.trim(); + if (!value) return []; + + if (value.startsWith('user:')) return [value.slice(5)].filter(Boolean); + if (value.startsWith('role:')) return this.resolveRole(value.slice(5), data, ctx); + if (value.startsWith('team:')) return this.resolveTeam(value.slice(5), data); + if (value.startsWith('owner_of:')) { + // `owner_of::` — id may itself contain ':' (rare), so + // split only on the first two segments. + const rest = value.slice('owner_of:'.length); + const sep = rest.indexOf(':'); + if (sep > 0) return this.resolveOwnerOf(rest.slice(0, sep), rest.slice(sep + 1), data); + this.opts.logger.warn(`[recipients] malformed owner_of spec '${value}'; skipped`); + return []; + } + if (looksLikeEmail(value)) return [await this.resolveEmail(value, data)]; + // Bare user id. + return [value]; + } + + /** `role:` → `sys_member` rows with that role in the tenant. */ + private async resolveRole(role: string, data: IDataEngine | undefined, ctx: ResolveContext): Promise { + if (!role || !data) return []; + const where: Record = { role }; + if (ctx.organizationId) where.organization_id = ctx.organizationId; + try { + const rows = await data.find(this.memberObject, { where, fields: ['user_id'], limit: 10000 }); + return userIds(rows); + } catch (err) { + this.opts.logger.warn(`[recipients] role '${role}' lookup failed (${msg(err)}); 0 recipients`); + return []; + } + } + + /** `team:` → `sys_team_member` rows for that team. */ + private async resolveTeam(teamId: string, data: IDataEngine | undefined): Promise { + if (!teamId || !data) return []; + try { + const rows = await data.find(this.teamMemberObject, { + where: { team_id: teamId }, + fields: ['user_id'], + limit: 10000, + }); + return userIds(rows); + } catch (err) { + this.opts.logger.warn(`[recipients] team '${teamId}' lookup failed (${msg(err)}); 0 recipients`); + return []; + } + } + + /** `owner_of:` → the owner/assignee field of the referenced record. */ + private async resolveOwnerOf(object: string, id: string, data: IDataEngine | undefined): Promise { + if (!object || !id || !data) return []; + try { + const rec = await data.findOne(object, { where: { id }, fields: ['id', ...this.ownerFields] }); + if (!rec) return []; + for (const f of this.ownerFields) { + const v = rec[f]; + if (typeof v === 'string' && v.length > 0) return [v]; + } + return []; + } catch (err) { + this.opts.logger.warn(`[recipients] owner_of '${object}:${id}' lookup failed (${msg(err)}); 0 recipients`); + return []; + } + } + + /** + * Resolve an email-shaped recipient to its user id. Falls back to the email + * verbatim on no match or lookup error (a downstream channel may still key + * a row by it — never lose the recipient on a directory miss). + */ + private async resolveEmail(email: string, data: IDataEngine | undefined): Promise { + if (!data) return email; + try { + const user = await data.findOne(this.userObject, { where: { email }, fields: ['id'] }); + const id = user?.id; + if (id != null && String(id).length > 0) return String(id); + this.opts.logger.warn(`[recipients] no '${this.userObject}' matched email '${email}'; keeping verbatim`); + return email; + } catch (err) { + this.opts.logger.warn(`[recipients] email '${email}' lookup failed (${msg(err)}); keeping verbatim`); + return email; + } + } +} + +/** Pull distinct non-empty `user_id`s from a row set. */ +function userIds(rows: unknown): string[] { + if (!Array.isArray(rows)) return []; + return [...new Set(rows.map((r: any) => String(r?.user_id ?? '')).filter(Boolean))]; +} + +function msg(err: unknown): string { + return (err as Error)?.message ?? String(err); +} diff --git a/packages/services/service-messaging/src/sql-outbox.ts b/packages/services/service-messaging/src/sql-outbox.ts new file mode 100644 index 000000000..abb838048 --- /dev/null +++ b/packages/services/service-messaging/src/sql-outbox.ts @@ -0,0 +1,211 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { randomUUID } from 'node:crypto'; +import type { IDataEngine } from '@objectstack/spec/contracts'; +import type { + AckResult, + ClaimOptions, + DeliveryStatus, + EnqueueDeliveryInput, + INotificationOutbox, + NotificationDeliveryRecord, +} from './outbox.js'; +import { hashPartition } from './backoff.js'; + +export const DELIVERY_OBJECT = 'sys_notification_delivery'; + +export interface SqlNotificationOutboxOptions { + /** Total partitions — MUST match the dispatcher's `partitionCount`. */ + partitionCount: number; + /** Object name override (default {@link DELIVERY_OBJECT}). */ + objectName?: string; +} + +interface DeliveryRow { + id: string; + notification_id: string; + recipient_id: string; + channel: string; + topic?: string | null; + payload?: unknown; // json column — engine returns object or string per driver + organization_id?: string | null; + partition_key: number; + status: DeliveryStatus; + attempts: number; + claimed_by?: string | null; + claimed_at?: number | null; + next_attempt_at?: number | null; + last_attempted_at?: number | null; + error?: string | null; + created_at: number; + updated_at: number; +} + +/** + * Durable {@link INotificationOutbox} over ObjectQL — the production store. + * Driver-agnostic (no `FOR UPDATE SKIP LOCKED`): safety comes from the + * dispatcher's per-partition cluster lock plus the atomic + * `UPDATE … WHERE status='pending'` claim. `partition_key` is precomputed on + * enqueue (ObjectQL has no portable `hash()` in WHERE). Mirrors + * `SqlWebhookOutbox`. + */ +export class SqlNotificationOutbox implements INotificationOutbox { + private readonly objectName: string; + private readonly partitionCount: number; + + constructor(private readonly engine: IDataEngine, opts: SqlNotificationOutboxOptions) { + if (opts.partitionCount <= 0) throw new Error('SqlNotificationOutbox: partitionCount must be > 0'); + this.objectName = opts.objectName ?? DELIVERY_OBJECT; + this.partitionCount = opts.partitionCount; + } + + async enqueue(input: EnqueueDeliveryInput): Promise { + const dedup = { + notification_id: input.notificationId, + recipient_id: input.recipientId, + channel: input.channel, + }; + const existing = await this.engine.findOne(this.objectName, { where: dedup, fields: ['id'] }); + if (existing?.id) return String(existing.id); + + const id = randomUUID(); + const now = Date.now(); + const row: DeliveryRow = { + id, + notification_id: input.notificationId, + recipient_id: input.recipientId, + channel: input.channel, + topic: input.topic ?? null, + payload: input.payload ?? {}, + organization_id: input.organizationId ?? null, + partition_key: hashPartition(input.notificationId, this.partitionCount), + status: 'pending', + attempts: 0, + created_at: now, + updated_at: now, + }; + try { + await this.engine.insert(this.objectName, row); + return id; + } catch (err) { + // Unique-index collision (dedup race) → return the winner. + const winner = await this.engine.findOne(this.objectName, { where: dedup, fields: ['id'] }); + if (winner?.id) return String(winner.id); + throw err; + } + } + + async claim(opts: ClaimOptions): Promise { + const now = opts.now ?? Date.now(); + + // 1. Reap stale in_flight rows (visibility-timeout recovery). + await this.engine.update( + this.objectName, + { status: 'pending', claimed_by: null, claimed_at: null, updated_at: now }, + { where: { status: 'in_flight', claimed_at: { $lt: now - opts.claimTtlMs } }, multi: true } as any, + ); + + // 2. Candidate ids: ready pending rows in our partition. + const partitionFilter = opts.partition ? { partition_key: opts.partition.index } : {}; + const candidates = await this.engine.find(this.objectName, { + where: { + status: 'pending', + ...partitionFilter, + $or: [{ next_attempt_at: null }, { next_attempt_at: { $lte: now } }], + }, + fields: ['id'], + limit: opts.limit, + }); + if (!candidates.length) return []; + const ids = (candidates as Array<{ id: string }>).map((c) => c.id); + + // 3. Atomic claim — WHERE status='pending' rejects rows another worker took. + await this.engine.update( + this.objectName, + { status: 'in_flight', claimed_by: opts.nodeId, claimed_at: now, updated_at: now }, + { where: { id: { $in: ids }, status: 'pending' }, multi: true } as any, + ); + + // 4. Read back only the rows we own. + const claimed = (await this.engine.find(this.objectName, { + where: { id: { $in: ids }, claimed_by: opts.nodeId, claimed_at: now, status: 'in_flight' }, + })) as DeliveryRow[]; + return claimed.map((r) => this.toRecord(r)); + } + + async ack(id: string, result: AckResult): Promise { + const current = (await this.engine.findOne(this.objectName, { + where: { id }, + fields: ['attempts'], + })) as { attempts?: number } | null; + if (!current) return; + + const now = Date.now(); + let status: DeliveryStatus; + let nextAttemptAt: number | null = null; + let error: string | null = null; + + if (result.success) { + status = 'success'; + } else if (result.suppressed) { + status = 'suppressed'; + error = result.error ?? null; + } else if (result.dead) { + status = 'dead'; + error = result.error ?? null; + } else { + status = 'pending'; + nextAttemptAt = result.nextAttemptAt ?? null; + error = result.error ?? null; + } + + await this.engine.update( + this.objectName, + { + status, + attempts: (current.attempts ?? 0) + 1, + last_attempted_at: now, + claimed_by: null, + claimed_at: null, + next_attempt_at: nextAttemptAt, + error, + updated_at: now, + }, + { where: { id }, multi: false } as any, + ); + } + + async list(filter?: { status?: DeliveryStatus; notificationId?: string }): Promise { + const where: Record = {}; + if (filter?.status) where.status = filter.status; + if (filter?.notificationId) where.notification_id = filter.notificationId; + const rows = (await this.engine.find(this.objectName, { where })) as DeliveryRow[]; + return rows.map((r) => this.toRecord(r)); + } + + private toRecord(r: DeliveryRow): NotificationDeliveryRecord { + let payload = r.payload ?? {}; + if (typeof payload === 'string') { + try { payload = JSON.parse(payload); } catch { payload = {}; } + } + return { + id: r.id, + notificationId: r.notification_id, + recipientId: r.recipient_id, + channel: r.channel, + topic: r.topic ?? undefined, + payload: payload as NotificationDeliveryRecord['payload'], + organizationId: r.organization_id ?? undefined, + partitionKey: r.partition_key, + status: r.status, + attempts: r.attempts, + claimedBy: r.claimed_by ?? undefined, + claimedAt: r.claimed_at ?? undefined, + nextAttemptAt: r.next_attempt_at ?? undefined, + lastAttemptedAt: r.last_attempted_at ?? undefined, + error: r.error ?? undefined, + createdAt: r.created_at, + updatedAt: r.updated_at, + }; + } +}