diff --git a/.changeset/adr-0030-notification-p2.md b/.changeset/adr-0030-notification-p2.md new file mode 100644 index 000000000..934070800 --- /dev/null +++ b/.changeset/adr-0030-notification-p2.md @@ -0,0 +1,28 @@ +--- +"@objectstack/service-messaging": minor +--- + +ADR-0030 P2 — subscription + preference. Adds the Layer-3 preference filter so +users can mute notification topics/channels, with admin-global defaults and +mandatory-topic bypass. + +- **`sys_notification_preference`** — per `(user_id, topic, channel)` toggle + (`enabled`, plus `digest`/`quiet_hours` for P3). `user_id='*'` rows are the + admin-global default; a real-user row overrides it; `topic`/`channel` support + `*` wildcards. Unique `(user_id, topic, channel)`. +- **`sys_notification_subscription`** — standing subscription of a principal + (`role:`/`team:`/`user:`/id) to a topic (the opt-in counterpart to explicit + audience; object + schema land now, subscription-driven fan-out is a follow-up). +- **`PreferenceResolver`** — wired into `MessagingService.emit()` between + recipient resolution and fan-out/enqueue. Most-specific-wins resolution + (user→`*`, topic→`*`, channel→`*`; default ON). Two safety rules: **mandatory + topics bypass** (configurable via `mandatoryTopics`, exact or `prefix.`), and + **fail-open** (no data engine or a lookup error delivers all, never silently + drops). `emit()` now filters the `(recipient × channel)` matrix per user. +- Both objects are registered by `MessagingServicePlugin` and contributed to the + Setup app's Configuration nav slot (ADR-0029 D7), so they appear in + REST/Studio only when messaging is installed. + +Acceptance: a user muting a topic/channel stops receiving it on that channel; +mandatory topics still deliver. service-messaging suite: 66 passing +(adds `preference-resolver.test.ts` + an emit-level mute/bypass test). diff --git a/docs/handoff/adr-0030-notification-convergence.md b/docs/handoff/adr-0030-notification-convergence.md index f31d7b4eb..81a5f4256 100644 --- a/docs/handoff/adr-0030-notification-convergence.md +++ b/docs/handoff/adr-0030-notification-convergence.md @@ -141,15 +141,24 @@ flipped — the inbox is being populated the whole time.) --- -## Remaining phases (from the build spec) - -- **P1 — Reliable delivery**: `sys_notification_delivery` outbox + dispatcher - (state machine, retry/backoff, dead-letter, `dedup_key`); `RecipientResolver` - (reuse sharing/CEL resolver) owning `role:`/`owner_of:`/`team:`/email→id. Move - the inbox channel's email→id fallback up here. -- **P2 — Subscription + preference**: `sys_notification_subscription` + - `sys_notification_preference` objects + Studio config UI; mandatory-topic - bypass; admin-global + per-user-override defaults. +## Phase status (from the build spec) + +- **P0 — Seams**: ✅ shipped (#1434). Single ingress, event re-model, receipt, + producers routed through `emit()`. (objectui bell cut-over + mark-read write + path still pending — see above.) +- **P1 — Reliable delivery**: ✅ shipped (#1441). `sys_notification_delivery` + outbox + `NotificationDispatcher` (state machine, retry/backoff, dead-letter); + `RecipientResolver` owns `role:`/`owner_of:`/`team:`/email→id (the inbox + channel's email→id fallback moved up). So the audience-selector caveat above is + now resolved when a data engine is present. +- **P2 — Subscription + preference**: ✅ shipped. `sys_notification_preference` + (per user×topic×channel toggle, admin-global `*` defaults + per-user override, + wildcards) + `sys_notification_subscription`; `PreferenceResolver` wired into + `emit()` (most-specific-wins, **mandatory-topic bypass**, **fail-open**); both + objects contributed to the Setup Configuration nav. + - *Follow-ups*: subscription-driven fan-out (expand a topic's subscribers when + a producer emits without an explicit audience) is schema-only so far; + `digest`/`quiet_hours` fields exist but the batching middleware is P3. - **P3 — Channels + templates + digest**: email/push/webhook/Slack channels on connectors (ADR-0022); `sys_notification_template` (topic×channel×locale) + renderer; digest / quiet-hours middleware. diff --git a/packages/services/service-messaging/src/index.ts b/packages/services/service-messaging/src/index.ts index 9b857ea23..665dd9226 100644 --- a/packages/services/service-messaging/src/index.ts +++ b/packages/services/service-messaging/src/index.ts @@ -42,6 +42,15 @@ export type { ResolveContext, } from './recipient-resolver.js'; +// Preference filter (ADR-0030 P2) +export { PreferenceResolver, PREFERENCE_OBJECT } from './preference-resolver.js'; +export type { + PreferenceResolverOptions, + PreferenceResolverLogger, + PreferenceContext, + PreferenceTarget, +} from './preference-resolver.js'; + // Channel seam export { createInboxChannel, INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js'; export type { InboxChannelOptions } from './inbox-channel.js'; @@ -77,4 +86,10 @@ export type { } from './dispatcher.js'; // Objects (metadata definitions) -export { InboxMessage, NotificationReceipt, NotificationDelivery } from './objects/index.js'; +export { + InboxMessage, + NotificationReceipt, + NotificationDelivery, + NotificationPreference, + NotificationSubscription, +} from './objects/index.js'; diff --git a/packages/services/service-messaging/src/messaging-service-plugin.ts b/packages/services/service-messaging/src/messaging-service-plugin.ts index af09cc32b..17d2f2d02 100644 --- a/packages/services/service-messaging/src/messaging-service-plugin.ts +++ b/packages/services/service-messaging/src/messaging-service-plugin.ts @@ -7,7 +7,13 @@ import { MessagingService } from './messaging-service.js'; import { createInboxChannel } from './inbox-channel.js'; import { SqlNotificationOutbox } from './sql-outbox.js'; import { NotificationDispatcher, type DispatchCluster } from './dispatcher.js'; -import { InboxMessage, NotificationReceipt, NotificationDelivery } from './objects/index.js'; +import { + InboxMessage, + NotificationReceipt, + NotificationDelivery, + NotificationPreference, + NotificationSubscription, +} from './objects/index.js'; export interface MessagingServicePluginOptions { /** @@ -25,6 +31,12 @@ export interface MessagingServicePluginOptions { partitionCount?: number; /** Dispatcher tick interval in ms (default 500). */ dispatchIntervalMs?: number; + /** + * Topics that bypass the per-user preference matrix (ADR-0030 P2) — e.g. + * security/system alerts users must not be able to mute. Exact match, or a + * `prefix.` entry for a prefix match (default none). + */ + mandatoryTopics?: readonly string[]; } /** @@ -65,6 +77,7 @@ export class MessagingServicePlugin implements Plugin { reliableDelivery: true, partitionCount: 8, dispatchIntervalMs: 500, + mandatoryTopics: [], ...options, }; } @@ -82,7 +95,11 @@ export class MessagingServicePlugin implements Plugin { } }; - const service = new MessagingService({ logger: ctx.logger, getData }); + const service = new MessagingService({ + logger: ctx.logger, + getData, + mandatoryTopics: this.options.mandatoryTopics, + }); if (this.options.registerInbox) { service.registerChannel(createInboxChannel({ getData })); @@ -90,14 +107,34 @@ export class MessagingServicePlugin implements Plugin { ctx.registerService('messaging', service); - // Register the messaging objects so their rows can be written. + // Register the messaging objects so their rows can be written. The + // preference/subscription objects (ADR-0030 P2) are Studio-configurable, + // so contribute them to the Setup app's Configuration slot (ADR-0029 D7) + // — they appear in nav only when this plugin is installed. ctx.getService<{ register(m: unknown): void }>('manifest').register({ id: 'com.objectstack.service.messaging', name: 'Messaging Service', version: '1.0.0', type: 'plugin', scope: 'system', - objects: [InboxMessage, NotificationReceipt, NotificationDelivery], + objects: [ + InboxMessage, + NotificationReceipt, + NotificationDelivery, + NotificationPreference, + NotificationSubscription, + ], + navigationContributions: [ + { + app: 'setup', + group: 'group_configuration', + priority: 120, + items: [ + { id: 'nav_notification_preferences', type: 'object', label: 'Notification Preferences', objectName: 'sys_notification_preference', icon: 'bell-ring', requiresObject: 'sys_notification_preference' }, + { id: 'nav_notification_subscriptions', type: 'object', label: 'Notification Subscriptions', objectName: 'sys_notification_subscription', icon: 'rss', requiresObject: 'sys_notification_subscription' }, + ], + }, + ], }); // Reliable delivery (P1): wire the outbox + dispatcher once the engine diff --git a/packages/services/service-messaging/src/messaging-service.test.ts b/packages/services/service-messaging/src/messaging-service.test.ts index a50e68de6..3e14d6712 100644 --- a/packages/services/service-messaging/src/messaging-service.test.ts +++ b/packages/services/service-messaging/src/messaging-service.test.ts @@ -187,6 +187,47 @@ describe('MessagingService', () => { expect(result.delivered).toBe(2); }); + it('applies the preference filter — a muted channel is dropped, a mandatory topic bypasses it', async () => { + // Engine returns a preference muting `email` for user_1 on topic 't'. + const prefRow = { user_id: 'user_1', topic: 't', channel: 'email', enabled: false }; + const engine = { + async insert(_o: string, row: any) { return { id: 'evt_1', ...row }; }, + async find(object: string, query: any) { + if (object === 'sys_notification_preference') { + return query?.where?.topic === 't' ? [prefRow] : []; + } + return []; + }, + async findOne() { return null; }, + async update() { return {}; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any; + + // Non-mandatory topic 't': email is muted → only inbox delivered. + const svc = new MessagingService({ logger: silentLogger(), getData: () => engine }); + const inbox = recordingChannel('inbox'); + const email = recordingChannel('email'); + svc.registerChannel(inbox.channel); + svc.registerChannel(email.channel); + const r1 = await svc.emit({ topic: 't', audience: ['user_1'], channels: ['inbox', 'email'], payload: { title: 'Hi' } }); + expect(inbox.seen).toHaveLength(1); + expect(email.seen).toHaveLength(0); // muted + expect(r1.delivered).toBe(1); + + // Same mute, but topic is mandatory → bypass → both channels delivered. + const mandatory = new MessagingService({ logger: silentLogger(), getData: () => engine, mandatoryTopics: ['t'] }); + const inbox2 = recordingChannel('inbox'); + const email2 = recordingChannel('email'); + mandatory.registerChannel(inbox2.channel); + mandatory.registerChannel(email2.channel); + const r2 = await mandatory.emit({ topic: 't', audience: ['user_1'], channels: ['inbox', 'email'], payload: { title: 'Hi' } }); + expect(inbox2.seen).toHaveLength(1); + expect(email2.seen).toHaveLength(1); // mandatory bypass + expect(r2.delivered).toBe(2); + }); + it('reports a failed delivery per recipient when a channel is unregistered, without throwing', async () => { const result = await service.emit({ topic: 't', diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index a73a034eb..f1b82b129 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -7,6 +7,7 @@ import type { Notification, } from './channel.js'; import { RecipientResolver } from './recipient-resolver.js'; +import { PreferenceResolver, type PreferenceTarget } from './preference-resolver.js'; import type { INotificationOutbox } from './outbox.js'; /** The L2 event object every `emit()` writes one row to (ADR-0030). */ @@ -84,6 +85,14 @@ export interface MessagingServiceContext extends MessagingChannelContext { now?(): string; /** Override the recipient resolver (tests). Defaults to a data-backed one. */ recipientResolver?: RecipientResolver; + /** Override the preference resolver (tests). Defaults to a data-backed one. */ + preferenceResolver?: PreferenceResolver; + /** + * Topics that bypass the per-user preference matrix (ADR-0030 P2) — e.g. + * security/system alerts users must not be able to mute. Exact match, or a + * `prefix.` entry for prefix match. + */ + mandatoryTopics?: readonly string[]; /** * Durable delivery outbox (ADR-0030 P1). When present, `emit()` enqueues a * `pending` delivery row per `(recipient × channel)` and the @@ -114,6 +123,7 @@ export class MessagingService { private readonly channels = new Map(); private readonly now: () => string; private readonly resolver: RecipientResolver; + private readonly preferences: PreferenceResolver; private outbox?: INotificationOutbox; constructor(private readonly ctx: MessagingServiceContext) { @@ -121,6 +131,13 @@ export class MessagingService { this.resolver = ctx.recipientResolver ?? new RecipientResolver({ getData: () => ctx.getData?.(), logger: ctx.logger }); + this.preferences = + ctx.preferenceResolver ?? + new PreferenceResolver({ + getData: () => ctx.getData?.(), + logger: ctx.logger, + mandatoryTopics: ctx.mandatoryTopics, + }); this.outbox = ctx.outbox; } @@ -192,19 +209,22 @@ export class MessagingService { return { notificationId, deduped: false, deliveries: [], delivered: 0, failed: 0 }; } - // 4) Either enqueue durable deliveries (P1 outbox) or fan out inline (P0). + // 3b) Preference filter (ADR-0030 P2): drop the (recipient × channel) + // pairs the user muted. Mandatory topics bypass; fail-open on error. const payload = input.payload ?? {}; const channels = input.channels?.length ? input.channels : ['inbox']; + const targets = await this.preferences.filter(recipients, channels, { + topic: input.topic, + organizationId: input.organizationId, + }); + if (targets.length === 0) { + this.ctx.logger.info(`[messaging] emit: topic '${input.topic}' suppressed for all recipients by preference`); + return { notificationId, deduped: false, deliveries: [], delivered: 0, failed: 0 }; + } + // 4) Either enqueue durable deliveries (P1 outbox) or fan out inline (P0). if (this.outbox) { - const deliveries = await this.enqueueDeliveries( - this.outbox, - notificationId, - recipients, - channels, - input, - payload, - ); + const deliveries = await this.enqueueDeliveries(this.outbox, notificationId, targets, input, payload); const delivered = deliveries.filter((d) => d.ok).length; return { notificationId, deduped: false, deliveries, delivered, failed: deliveries.length - delivered }; } @@ -222,7 +242,7 @@ export class MessagingService { payload: input.payload, }; - const { deliveries, delivered, failed } = await this.fanOut(notification, recipients); + const { deliveries, delivered, failed } = await this.fanOut(notification, targets); return { notificationId, deduped: false, deliveries, delivered, failed }; } @@ -235,8 +255,7 @@ export class MessagingService { private async enqueueDeliveries( outbox: INotificationOutbox, notificationId: string, - recipients: string[], - channels: string[], + targets: PreferenceTarget[], input: EmitInput, payload: Record, ): Promise { @@ -250,8 +269,8 @@ export class MessagingService { actionUrl: str(payload.url) ?? str(payload.actionUrl), }; const deliveries: DeliveryOutcome[] = []; - for (const channel of channels) { - for (const recipient of recipients) { + for (const { recipient, channels } of targets) { + for (const channel of channels) { try { const id = await outbox.enqueue({ notificationId, @@ -314,41 +333,32 @@ export class MessagingService { } /** - * Fan a notification out to its channels and recipients. Each - * `(channel, recipient)` pair becomes one `send()` call. An unregistered + * Fan a notification out to each recipient's accepted channels. Each + * `(recipient, channel)` pair becomes one `send()` call. An unregistered * channel, or a channel that throws, is reported as a failed delivery — it * never aborts the rest of the fan-out. */ private async fanOut( notification: Notification, - recipients: string[], + targets: PreferenceTarget[], ): Promise<{ deliveries: DeliveryOutcome[]; delivered: number; failed: number }> { - const channels = notification.channels?.length ? notification.channels : ['inbox']; const deliveries: DeliveryOutcome[] = []; - for (const channelId of channels) { - const channel = this.channels.get(channelId); - if (!channel) { - // Surface the gap per recipient so the caller sees who missed out. - for (const recipient of recipients) { + for (const { recipient, channels } of targets) { + for (const channelId of channels) { + const channel = this.channels.get(channelId); + if (!channel) { deliveries.push({ channel: channelId, recipient, ok: false, error: `channel '${channelId}' not registered`, }); + this.ctx.logger.warn(`[messaging] emit: channel '${channelId}' not registered`); + continue; } - this.ctx.logger.warn(`[messaging] emit: channel '${channelId}' not registered`); - continue; - } - - for (const recipient of recipients) { try { - const result = await channel.send(this.ctx, { - notification, - channel: channelId, - recipient, - }); + const result = await channel.send(this.ctx, { notification, channel: channelId, recipient }); deliveries.push({ channel: channelId, recipient, diff --git a/packages/services/service-messaging/src/objects/index.ts b/packages/services/service-messaging/src/objects/index.ts index 4605e406b..0c48cbf76 100644 --- a/packages/services/service-messaging/src/objects/index.ts +++ b/packages/services/service-messaging/src/objects/index.ts @@ -3,3 +3,5 @@ export { InboxMessage } from './inbox-message.object.js'; export { NotificationReceipt } from './notification-receipt.object.js'; export { NotificationDelivery } from './notification-delivery.object.js'; +export { NotificationPreference } from './notification-preference.object.js'; +export { NotificationSubscription } from './notification-subscription.object.js'; diff --git a/packages/services/service-messaging/src/objects/notification-preference.object.ts b/packages/services/service-messaging/src/objects/notification-preference.object.ts new file mode 100644 index 000000000..055da424a --- /dev/null +++ b/packages/services/service-messaging/src/objects/notification-preference.object.ts @@ -0,0 +1,85 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { ObjectSchema, Field } from '@objectstack/spec/data'; + +/** + * `sys_notification_preference` — per-user × topic × channel delivery toggle + * (ADR-0030 Layer 3). + * + * The mute/allow matrix the preference filter consults before fan-out. A row + * declares whether `user_id` wants `topic` on `channel`. Resolution is + * most-specific-wins with wildcards: + * + * (user, topic, channel) → (user, topic, *) → (user, *, channel) → + * (user, *, *) → ('*', topic, channel) → … → ('*', '*', '*') → default ON + * + * `user_id = '*'` rows are the **admin global default**; a real-user row + * **overrides** it. `topic = '*'` / `channel = '*'` are wildcards. Mandatory + * topics (configured on the service) bypass this object entirely. + * + * Belongs to `service-messaging` (owner of the delivery pipeline). + */ +export const NotificationPreference = ObjectSchema.create({ + name: 'sys_notification_preference', + label: 'Notification Preference', + pluralLabel: 'Notification Preferences', + icon: 'bell-ring', + isSystem: true, + managedBy: 'system', + description: 'Per-user × topic × channel notification toggle (mute/allow), with admin-global defaults.', + titleFormat: '{user_id} · {topic} · {channel}', + compactLayout: ['user_id', 'topic', 'channel', 'enabled', 'digest'], + + fields: { + id: Field.text({ label: 'Preference ID', required: true, readonly: true }), + + user_id: Field.text({ + label: 'User', + required: true, + searchable: true, + description: "Recipient user id, or '*' for the admin-global default.", + }), + + topic: Field.text({ + label: 'Topic', + required: true, + searchable: true, + defaultValue: '*', + description: "Notification topic, or '*' for all topics.", + }), + + channel: Field.text({ + label: 'Channel', + required: true, + defaultValue: '*', + description: "Channel id (inbox/email/push/…), or '*' for all channels.", + }), + + enabled: Field.boolean({ + label: 'Enabled', + defaultValue: true, + description: 'When false, this (user, topic, channel) is muted.', + }), + + digest: Field.select(['none', 'daily', 'weekly'], { + label: 'Digest', + required: false, + defaultValue: 'none', + description: 'Batch cadence (P3 digest middleware).', + }), + + quiet_hours: Field.json({ + label: 'Quiet Hours', + required: false, + description: 'Optional { tz, start, end } window (P3 quiet-hours middleware).', + }), + + created_at: Field.datetime({ label: 'Created At', readonly: true }), + updated_at: Field.datetime({ label: 'Updated At', required: false }), + }, + + indexes: [ + { fields: ['user_id', 'topic', 'channel'], unique: true }, + { fields: ['topic'] }, + ], +}); diff --git a/packages/services/service-messaging/src/objects/notification-subscription.object.ts b/packages/services/service-messaging/src/objects/notification-subscription.object.ts new file mode 100644 index 000000000..84d50bbe6 --- /dev/null +++ b/packages/services/service-messaging/src/objects/notification-subscription.object.ts @@ -0,0 +1,61 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { ObjectSchema, Field } from '@objectstack/spec/data'; + +/** + * `sys_notification_subscription` — who is subscribed to a topic (ADR-0030 + * Layer 3). + * + * Declares standing interest in a `topic` by a `principal` (`role:x`, `team:x`, + * `user:id`, or a bare user id). Where a producer emits with `audience: + * 'subscribers'` (or no explicit audience), the resolver expands the topic's + * subscriptions into recipients — the opt-in counterpart to the explicit + * audience most producers pass today. + * + * Distinct from `sys_notification_preference`: a subscription says "include me + * for this topic"; a preference says "but mute it on this channel". + * + * Belongs to `service-messaging`. + */ +export const NotificationSubscription = ObjectSchema.create({ + name: 'sys_notification_subscription', + label: 'Notification Subscription', + pluralLabel: 'Notification Subscriptions', + icon: 'rss', + isSystem: true, + managedBy: 'system', + description: 'Standing subscription of a principal (role/team/user) to a notification topic.', + titleFormat: '{principal} · {topic}', + compactLayout: ['topic', 'principal', 'enabled', 'created_at'], + + fields: { + id: Field.text({ label: 'Subscription ID', required: true, readonly: true }), + + topic: Field.text({ + label: 'Topic', + required: true, + searchable: true, + description: 'Notification topic this principal subscribes to.', + }), + + principal: Field.text({ + label: 'Principal', + required: true, + searchable: true, + description: "Subscriber selector: 'role:x' | 'team:x' | 'user:id' | bare user id.", + }), + + enabled: Field.boolean({ + label: 'Enabled', + defaultValue: true, + description: 'When false, the subscription is inactive.', + }), + + created_at: Field.datetime({ label: 'Created At', readonly: true }), + }, + + indexes: [ + { fields: ['topic', 'principal'], unique: true }, + { fields: ['topic'] }, + ], +}); diff --git a/packages/services/service-messaging/src/preference-resolver.test.ts b/packages/services/service-messaging/src/preference-resolver.test.ts new file mode 100644 index 000000000..5c2cfae1e --- /dev/null +++ b/packages/services/service-messaging/src/preference-resolver.test.ts @@ -0,0 +1,143 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { PreferenceResolver } from './preference-resolver.js'; + +function silentLogger() { + return { info: () => {}, warn: () => {}, error: () => {} }; +} + +/** + * Fake data engine answering `find('sys_notification_preference', { where })` + * from an in-memory row list, filtered by the query's `topic` (and org). + */ +function fakeData(rows: any[] = [], opts: { throwOnFind?: boolean } = {}) { + const queries: any[] = []; + return { + queries, + engine: { + async find(object: string, query: any) { + queries.push({ object, where: query?.where }); + if (opts.throwOnFind) throw new Error('pref table locked'); + const w = query?.where ?? {}; + return rows.filter( + (r) => + r.topic === w.topic && + (w.organization_id == null || r.organization_id === w.organization_id), + ); + }, + async findOne() { return null; }, + async insert(_o: string, r: any) { return { id: 'x', ...r }; }, + async update() { return {}; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any, + }; +} + +function pref(over: Partial<{ user_id: string; topic: string; channel: string; enabled: boolean; organization_id: string }>) { + return { user_id: '*', topic: '*', channel: '*', enabled: true, ...over }; +} + +function resolver(getData: () => any, mandatoryTopics: string[] = []) { + return new PreferenceResolver({ getData, logger: silentLogger(), mandatoryTopics }); +} + +describe('PreferenceResolver', () => { + it('fails open (all channels) when there is no data engine', async () => { + const r = resolver(() => undefined); + const out = await r.filter(['u1', 'u2'], ['inbox', 'email'], { topic: 'task.assigned' }); + expect(out).toEqual([ + { recipient: 'u1', channels: ['inbox', 'email'] }, + { recipient: 'u2', channels: ['inbox', 'email'] }, + ]); + }); + + it('returns [] for empty recipients or channels', async () => { + const r = resolver(() => fakeData().engine); + expect(await r.filter([], ['inbox'], { topic: 't' })).toEqual([]); + expect(await r.filter(['u1'], [], { topic: 't' })).toEqual([]); + }); + + it('defaults every (recipient, channel) ON when no rows exist', async () => { + const r = resolver(() => fakeData([]).engine); + const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'task.assigned' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox', 'email'] }]); + }); + + it('drops a single channel a user muted, keeping the others', async () => { + const rows = [pref({ user_id: 'u1', topic: 'task.assigned', channel: 'email', enabled: false })]; + const r = resolver(() => fakeData(rows).engine); + const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'task.assigned' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox'] }]); + }); + + it('drops a recipient entirely when they mute the topic on all channels (channel "*")', async () => { + const rows = [pref({ user_id: 'u1', topic: 'task.assigned', channel: '*', enabled: false })]; + const r = resolver(() => fakeData(rows).engine); + const out = await r.filter(['u1', 'u2'], ['inbox', 'email'], { topic: 'task.assigned' }); + expect(out).toEqual([{ recipient: 'u2', channels: ['inbox', 'email'] }]); + }); + + it('lets a per-user row override the admin-global default', async () => { + const rows = [ + pref({ user_id: '*', topic: 'task.assigned', channel: 'email', enabled: false }), // global: email off + pref({ user_id: 'u1', topic: 'task.assigned', channel: 'email', enabled: true }), // u1 opts back in + ]; + const r = resolver(() => fakeData(rows).engine); + const out = await r.filter(['u1', 'u2'], ['inbox', 'email'], { topic: 'task.assigned' }); + // u1 re-enabled email; u2 inherits the global mute (email dropped). + expect(out).toEqual([ + { recipient: 'u1', channels: ['inbox', 'email'] }, + { recipient: 'u2', channels: ['inbox'] }, + ]); + }); + + it('prefers the most specific row (topic+channel beats topic-wildcard)', async () => { + const rows = [ + pref({ user_id: 'u1', topic: 'task.assigned', channel: '*', enabled: false }), // mute all channels… + pref({ user_id: 'u1', topic: 'task.assigned', channel: 'inbox', enabled: true }), // …except inbox + ]; + const r = resolver(() => fakeData(rows).engine); + const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'task.assigned' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox'] }]); + }); + + it('honours a wildcard-topic preference row', async () => { + const rows = [pref({ user_id: 'u1', topic: '*', channel: 'email', enabled: false })]; + const r = resolver(() => fakeData(rows).engine); + const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'anything.at.all' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox'] }]); + }); + + it('bypasses preferences for a mandatory topic (exact match) even when muted', async () => { + const rows = [pref({ user_id: 'u1', topic: 'security.breach', channel: '*', enabled: false })]; + const r = resolver(() => fakeData(rows).engine, ['security.breach']); + const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'security.breach' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox', 'email'] }]); + }); + + it('bypasses preferences for a mandatory topic prefix', async () => { + const r = resolver(() => fakeData([pref({ user_id: 'u1', topic: '*', channel: '*', enabled: false })]).engine, ['security.']); + const out = await r.filter(['u1'], ['inbox'], { topic: 'security.mfa_disabled' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox'] }]); + expect(r.isMandatory('security.mfa_disabled')).toBe(true); + expect(r.isMandatory('task.assigned')).toBe(false); + }); + + it('fails open when the preference lookup throws', async () => { + const r = resolver(() => fakeData([], { throwOnFind: true }).engine); + const out = await r.filter(['u1'], ['inbox', 'email'], { topic: 'task.assigned' }); + expect(out).toEqual([{ recipient: 'u1', channels: ['inbox', 'email'] }]); + }); + + it('scopes the lookup to the organization when provided', async () => { + const data = fakeData([ + pref({ user_id: 'u1', topic: 'task.assigned', channel: 'email', enabled: false, organization_id: 'org_1' }), + ]); + const r = resolver(() => data.engine); + await r.filter(['u1'], ['inbox', 'email'], { topic: 'task.assigned', organizationId: 'org_1' }); + expect(data.queries.every((q) => q.where.organization_id === 'org_1')).toBe(true); + }); +}); diff --git a/packages/services/service-messaging/src/preference-resolver.ts b/packages/services/service-messaging/src/preference-resolver.ts new file mode 100644 index 000000000..9a9987681 --- /dev/null +++ b/packages/services/service-messaging/src/preference-resolver.ts @@ -0,0 +1,154 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { IDataEngine } from '@objectstack/spec/contracts'; + +/** The object the preference matrix lives in. */ +export const PREFERENCE_OBJECT = 'sys_notification_preference'; + +export interface PreferenceResolverLogger { + info(...args: unknown[]): void; + warn(...args: unknown[]): void; + error(...args: unknown[]): void; +} + +export interface PreferenceResolverOptions { + /** Lazily resolve the data engine; `undefined` ⇒ fail-open (deliver all). */ + getData(): IDataEngine | undefined; + logger: PreferenceResolverLogger; + /** + * Topics that bypass preferences entirely (security/system alerts users + * must not be able to mute). An entry ending in `.` is a prefix match + * (`security.` matches `security.breach`); otherwise it is an exact match. + */ + mandatoryTopics?: readonly string[]; + /** Object name override (default {@link PREFERENCE_OBJECT}). */ + objectName?: string; +} + +export interface PreferenceContext { + topic: string; + organizationId?: string; +} + +/** A recipient with the channels they accept for this notification. */ +export interface PreferenceTarget { + recipient: string; + channels: string[]; +} + +const WILDCARD = '*'; + +/** + * PreferenceResolver — the ADR-0030 Layer-3 preference filter (P2). + * + * Given the resolved recipients and the requested channels, returns, per + * recipient, the channels they actually accept for `topic`. Resolution is + * most-specific-wins over `sys_notification_preference` rows with `*` wildcards + * for user / topic / channel; a real-user row overrides the `user_id='*'` + * admin-global default; the built-in default is **on**. + * + * Two safety rules: + * - **Mandatory topics bypass** the matrix (all channels kept). + * - **Fail-open**: no data engine, or a lookup error, keeps all channels — a + * preference outage must never silently swallow notifications. + */ +export class PreferenceResolver { + private readonly objectName: string; + private readonly mandatory: readonly string[]; + + constructor(private readonly opts: PreferenceResolverOptions) { + this.objectName = opts.objectName ?? PREFERENCE_OBJECT; + this.mandatory = opts.mandatoryTopics ?? []; + } + + /** Whether a topic bypasses preferences (exact or `prefix.` match). */ + isMandatory(topic: string): boolean { + return this.mandatory.some((m) => + m.endsWith('.') ? topic.startsWith(m) : topic === m, + ); + } + + /** + * Filter `(recipient × channel)` by preference. Recipients left with no + * accepted channel are dropped from the result. + */ + async filter( + recipients: string[], + channels: string[], + ctx: PreferenceContext, + ): Promise { + const all = (): PreferenceTarget[] => recipients.map((r) => ({ recipient: r, channels: [...channels] })); + if (recipients.length === 0 || channels.length === 0) return []; + if (this.isMandatory(ctx.topic)) return all(); + + const data = this.opts.getData(); + if (!data) return all(); // fail-open + + let rows: Record[]; + try { + rows = await this.loadRows(data, ctx); + } catch (err) { + this.opts.logger.warn( + `[preferences] lookup for topic '${ctx.topic}' failed (${msg(err)}); delivering all (fail-open)`, + ); + return all(); + } + + // Index rows by `${user}|${topic}|${channel}` → enabled. + const recipientSet = new Set(recipients); + const index = new Map(); + for (const r of rows) { + const user = String(r.user_id ?? ''); + if (user !== WILDCARD && !recipientSet.has(user)) continue; // ignore unrelated users + const topic = String(r.topic ?? WILDCARD); + const channel = String(r.channel ?? WILDCARD); + index.set(`${user}|${topic}|${channel}`, asBool(r.enabled)); + } + + const targets: PreferenceTarget[] = []; + for (const recipient of recipients) { + const accepted = channels.filter((channel) => + this.enabledFor(index, recipient, ctx.topic, channel), + ); + if (accepted.length > 0) targets.push({ recipient, channels: accepted }); + } + return targets; + } + + /** Load the candidate rows (topic-specific + wildcard-topic), org-scoped. */ + private async loadRows(data: IDataEngine, ctx: PreferenceContext): Promise[]> { + // Two equality queries (topic and the '*' wildcard) avoid relying on + // driver-specific IN support; user filtering is done in memory. + const base: Record = {}; + if (ctx.organizationId) base.organization_id = ctx.organizationId; + const [specific, wildcard] = await Promise.all([ + data.find(this.objectName, { where: { ...base, topic: ctx.topic }, limit: 10000 }), + data.find(this.objectName, { where: { ...base, topic: WILDCARD }, limit: 10000 }), + ]); + return [...(specific ?? []), ...(wildcard ?? [])]; + } + + /** + * Most-specific-wins lookup for (user, topic, channel). User-specific beats + * the `*` user; topic/channel specific beats their wildcards. Default on. + */ + private enabledFor(index: Map, user: string, topic: string, channel: string): boolean { + for (const u of [user, WILDCARD]) { + for (const t of [topic, WILDCARD]) { + for (const c of [channel, WILDCARD]) { + const hit = index.get(`${u}|${t}|${c}`); + if (hit !== undefined) return hit; + } + } + } + return true; // built-in default: opted in + } +} + +function asBool(v: unknown): boolean { + return v === true || v === 1 || v === '1' || v === 'true'; +} + +function msg(err: unknown): string { + return (err as Error)?.message ?? String(err); +}