Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .changeset/adr-0030-notification-p2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"@objectstack/service-messaging": minor
---

ADR-0030 P2 — subscription + preference. Adds the Layer-3 preference filter so
users can mute notification topics/channels, with admin-global defaults and
mandatory-topic bypass.

- **`sys_notification_preference`** — per `(user_id, topic, channel)` toggle
(`enabled`, plus `digest`/`quiet_hours` for P3). `user_id='*'` rows are the
admin-global default; a real-user row overrides it; `topic`/`channel` support
`*` wildcards. Unique `(user_id, topic, channel)`.
- **`sys_notification_subscription`** — standing subscription of a principal
(`role:`/`team:`/`user:`/id) to a topic (the opt-in counterpart to explicit
audience; object + schema land now, subscription-driven fan-out is a follow-up).
- **`PreferenceResolver`** — wired into `MessagingService.emit()` between
recipient resolution and fan-out/enqueue. Most-specific-wins resolution
(user→`*`, topic→`*`, channel→`*`; default ON). Two safety rules: **mandatory
topics bypass** (configurable via `mandatoryTopics`, exact or `prefix.`), and
**fail-open** (no data engine or a lookup error delivers all, never silently
drops). `emit()` now filters the `(recipient × channel)` matrix per user.
- Both objects are registered by `MessagingServicePlugin` and contributed to the
Setup app's Configuration nav slot (ADR-0029 D7), so they appear in
REST/Studio only when messaging is installed.

Acceptance: a user muting a topic/channel stops receiving it on that channel;
mandatory topics still deliver. service-messaging suite: 66 passing
(adds `preference-resolver.test.ts` + an emit-level mute/bypass test).
27 changes: 18 additions & 9 deletions docs/handoff/adr-0030-notification-convergence.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,24 @@ flipped — the inbox is being populated the whole time.)

---

## Remaining phases (from the build spec)

- **P1 — Reliable delivery**: `sys_notification_delivery` outbox + dispatcher
(state machine, retry/backoff, dead-letter, `dedup_key`); `RecipientResolver`
(reuse sharing/CEL resolver) owning `role:`/`owner_of:`/`team:`/email→id. Move
the inbox channel's email→id fallback up here.
- **P2 — Subscription + preference**: `sys_notification_subscription` +
`sys_notification_preference` objects + Studio config UI; mandatory-topic
bypass; admin-global + per-user-override defaults.
## Phase status (from the build spec)

- **P0 — Seams**: ✅ shipped (#1434). Single ingress, event re-model, receipt,
producers routed through `emit()`. (objectui bell cut-over + mark-read write
path still pending — see above.)
- **P1 — Reliable delivery**: ✅ shipped (#1441). `sys_notification_delivery`
outbox + `NotificationDispatcher` (state machine, retry/backoff, dead-letter);
`RecipientResolver` owns `role:`/`owner_of:`/`team:`/email→id (the inbox
channel's email→id fallback moved up). So the audience-selector caveat above is
now resolved when a data engine is present.
- **P2 — Subscription + preference**: ✅ shipped. `sys_notification_preference`
(per user×topic×channel toggle, admin-global `*` defaults + per-user override,
wildcards) + `sys_notification_subscription`; `PreferenceResolver` wired into
`emit()` (most-specific-wins, **mandatory-topic bypass**, **fail-open**); both
objects contributed to the Setup Configuration nav.
- *Follow-ups*: subscription-driven fan-out (expand a topic's subscribers when
a producer emits without an explicit audience) is schema-only so far;
`digest`/`quiet_hours` fields exist but the batching middleware is P3.
- **P3 — Channels + templates + digest**: email/push/webhook/Slack channels on
connectors (ADR-0022); `sys_notification_template` (topic×channel×locale) +
renderer; digest / quiet-hours middleware.
17 changes: 16 additions & 1 deletion packages/services/service-messaging/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ export type {
ResolveContext,
} from './recipient-resolver.js';

// Preference filter (ADR-0030 P2)
export { PreferenceResolver, PREFERENCE_OBJECT } from './preference-resolver.js';
export type {
PreferenceResolverOptions,
PreferenceResolverLogger,
PreferenceContext,
PreferenceTarget,
} from './preference-resolver.js';

// Channel seam
export { createInboxChannel, INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js';
export type { InboxChannelOptions } from './inbox-channel.js';
Expand Down Expand Up @@ -77,4 +86,10 @@ export type {
} from './dispatcher.js';

// Objects (metadata definitions)
export { InboxMessage, NotificationReceipt, NotificationDelivery } from './objects/index.js';
export {
InboxMessage,
NotificationReceipt,
NotificationDelivery,
NotificationPreference,
NotificationSubscription,
} from './objects/index.js';
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import { MessagingService } from './messaging-service.js';
import { createInboxChannel } from './inbox-channel.js';
import { SqlNotificationOutbox } from './sql-outbox.js';
import { NotificationDispatcher, type DispatchCluster } from './dispatcher.js';
import { InboxMessage, NotificationReceipt, NotificationDelivery } from './objects/index.js';
import {
InboxMessage,
NotificationReceipt,
NotificationDelivery,
NotificationPreference,
NotificationSubscription,
} from './objects/index.js';

export interface MessagingServicePluginOptions {
/**
Expand All @@ -25,6 +31,12 @@ export interface MessagingServicePluginOptions {
partitionCount?: number;
/** Dispatcher tick interval in ms (default 500). */
dispatchIntervalMs?: number;
/**
* Topics that bypass the per-user preference matrix (ADR-0030 P2) — e.g.
* security/system alerts users must not be able to mute. Exact match, or a
* `prefix.` entry for a prefix match (default none).
*/
mandatoryTopics?: readonly string[];
}

/**
Expand Down Expand Up @@ -65,6 +77,7 @@ export class MessagingServicePlugin implements Plugin {
reliableDelivery: true,
partitionCount: 8,
dispatchIntervalMs: 500,
mandatoryTopics: [],
...options,
};
}
Expand All @@ -82,22 +95,46 @@ export class MessagingServicePlugin implements Plugin {
}
};

const service = new MessagingService({ logger: ctx.logger, getData });
const service = new MessagingService({
logger: ctx.logger,
getData,
mandatoryTopics: this.options.mandatoryTopics,
});

if (this.options.registerInbox) {
service.registerChannel(createInboxChannel({ getData }));
}

ctx.registerService('messaging', service);

// Register the messaging objects so their rows can be written.
// Register the messaging objects so their rows can be written. The
// preference/subscription objects (ADR-0030 P2) are Studio-configurable,
// so contribute them to the Setup app's Configuration slot (ADR-0029 D7)
// — they appear in nav only when this plugin is installed.
ctx.getService<{ register(m: unknown): void }>('manifest').register({
id: 'com.objectstack.service.messaging',
name: 'Messaging Service',
version: '1.0.0',
type: 'plugin',
scope: 'system',
objects: [InboxMessage, NotificationReceipt, NotificationDelivery],
objects: [
InboxMessage,
NotificationReceipt,
NotificationDelivery,
NotificationPreference,
NotificationSubscription,
],
navigationContributions: [
{
app: 'setup',
group: 'group_configuration',
priority: 120,
items: [
{ id: 'nav_notification_preferences', type: 'object', label: 'Notification Preferences', objectName: 'sys_notification_preference', icon: 'bell-ring', requiresObject: 'sys_notification_preference' },
{ id: 'nav_notification_subscriptions', type: 'object', label: 'Notification Subscriptions', objectName: 'sys_notification_subscription', icon: 'rss', requiresObject: 'sys_notification_subscription' },
],
},
],
});

// Reliable delivery (P1): wire the outbox + dispatcher once the engine
Expand Down
41 changes: 41 additions & 0 deletions packages/services/service-messaging/src/messaging-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,47 @@ describe('MessagingService', () => {
expect(result.delivered).toBe(2);
});

it('applies the preference filter — a muted channel is dropped, a mandatory topic bypasses it', async () => {
// Engine returns a preference muting `email` for user_1 on topic 't'.
const prefRow = { user_id: 'user_1', topic: 't', channel: 'email', enabled: false };
const engine = {
async insert(_o: string, row: any) { return { id: 'evt_1', ...row }; },
async find(object: string, query: any) {
if (object === 'sys_notification_preference') {
return query?.where?.topic === 't' ? [prefRow] : [];
}
return [];
},
async findOne() { return null; },
async update() { return {}; },
async delete() { return {}; },
async count() { return 0; },
async aggregate() { return []; },
} as any;

// Non-mandatory topic 't': email is muted → only inbox delivered.
const svc = new MessagingService({ logger: silentLogger(), getData: () => engine });
const inbox = recordingChannel('inbox');
const email = recordingChannel('email');
svc.registerChannel(inbox.channel);
svc.registerChannel(email.channel);
const r1 = await svc.emit({ topic: 't', audience: ['user_1'], channels: ['inbox', 'email'], payload: { title: 'Hi' } });
expect(inbox.seen).toHaveLength(1);
expect(email.seen).toHaveLength(0); // muted
expect(r1.delivered).toBe(1);

// Same mute, but topic is mandatory → bypass → both channels delivered.
const mandatory = new MessagingService({ logger: silentLogger(), getData: () => engine, mandatoryTopics: ['t'] });
const inbox2 = recordingChannel('inbox');
const email2 = recordingChannel('email');
mandatory.registerChannel(inbox2.channel);
mandatory.registerChannel(email2.channel);
const r2 = await mandatory.emit({ topic: 't', audience: ['user_1'], channels: ['inbox', 'email'], payload: { title: 'Hi' } });
expect(inbox2.seen).toHaveLength(1);
expect(email2.seen).toHaveLength(1); // mandatory bypass
expect(r2.delivered).toBe(2);
});

it('reports a failed delivery per recipient when a channel is unregistered, without throwing', async () => {
const result = await service.emit({
topic: 't',
Expand Down
76 changes: 43 additions & 33 deletions packages/services/service-messaging/src/messaging-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
Notification,
} from './channel.js';
import { RecipientResolver } from './recipient-resolver.js';
import { PreferenceResolver, type PreferenceTarget } from './preference-resolver.js';
import type { INotificationOutbox } from './outbox.js';

/** The L2 event object every `emit()` writes one row to (ADR-0030). */
Expand Down Expand Up @@ -84,6 +85,14 @@ export interface MessagingServiceContext extends MessagingChannelContext {
now?(): string;
/** Override the recipient resolver (tests). Defaults to a data-backed one. */
recipientResolver?: RecipientResolver;
/** Override the preference resolver (tests). Defaults to a data-backed one. */
preferenceResolver?: PreferenceResolver;
/**
* Topics that bypass the per-user preference matrix (ADR-0030 P2) — e.g.
* security/system alerts users must not be able to mute. Exact match, or a
* `prefix.` entry for prefix match.
*/
mandatoryTopics?: readonly string[];
/**
* Durable delivery outbox (ADR-0030 P1). When present, `emit()` enqueues a
* `pending` delivery row per `(recipient × channel)` and the
Expand Down Expand Up @@ -114,13 +123,21 @@ export class MessagingService {
private readonly channels = new Map<string, MessagingChannel>();
private readonly now: () => string;
private readonly resolver: RecipientResolver;
private readonly preferences: PreferenceResolver;
private outbox?: INotificationOutbox;

constructor(private readonly ctx: MessagingServiceContext) {
this.now = ctx.now ?? (() => new Date().toISOString());
this.resolver =
ctx.recipientResolver ??
new RecipientResolver({ getData: () => ctx.getData?.(), logger: ctx.logger });
this.preferences =
ctx.preferenceResolver ??
new PreferenceResolver({
getData: () => ctx.getData?.(),
logger: ctx.logger,
mandatoryTopics: ctx.mandatoryTopics,
});
this.outbox = ctx.outbox;
}

Expand Down Expand Up @@ -192,19 +209,22 @@ export class MessagingService {
return { notificationId, deduped: false, deliveries: [], delivered: 0, failed: 0 };
}

// 4) Either enqueue durable deliveries (P1 outbox) or fan out inline (P0).
// 3b) Preference filter (ADR-0030 P2): drop the (recipient × channel)
// pairs the user muted. Mandatory topics bypass; fail-open on error.
const payload = input.payload ?? {};
const channels = input.channels?.length ? input.channels : ['inbox'];
const targets = await this.preferences.filter(recipients, channels, {
topic: input.topic,
organizationId: input.organizationId,
});
if (targets.length === 0) {
this.ctx.logger.info(`[messaging] emit: topic '${input.topic}' suppressed for all recipients by preference`);
return { notificationId, deduped: false, deliveries: [], delivered: 0, failed: 0 };
}

// 4) Either enqueue durable deliveries (P1 outbox) or fan out inline (P0).
if (this.outbox) {
const deliveries = await this.enqueueDeliveries(
this.outbox,
notificationId,
recipients,
channels,
input,
payload,
);
const deliveries = await this.enqueueDeliveries(this.outbox, notificationId, targets, input, payload);
const delivered = deliveries.filter((d) => d.ok).length;
return { notificationId, deduped: false, deliveries, delivered, failed: deliveries.length - delivered };
}
Expand All @@ -222,7 +242,7 @@ export class MessagingService {
payload: input.payload,
};

const { deliveries, delivered, failed } = await this.fanOut(notification, recipients);
const { deliveries, delivered, failed } = await this.fanOut(notification, targets);
return { notificationId, deduped: false, deliveries, delivered, failed };
}

Expand All @@ -235,8 +255,7 @@ export class MessagingService {
private async enqueueDeliveries(
outbox: INotificationOutbox,
notificationId: string,
recipients: string[],
channels: string[],
targets: PreferenceTarget[],
input: EmitInput,
payload: Record<string, unknown>,
): Promise<DeliveryOutcome[]> {
Expand All @@ -250,8 +269,8 @@ export class MessagingService {
actionUrl: str(payload.url) ?? str(payload.actionUrl),
};
const deliveries: DeliveryOutcome[] = [];
for (const channel of channels) {
for (const recipient of recipients) {
for (const { recipient, channels } of targets) {
for (const channel of channels) {
try {
const id = await outbox.enqueue({
notificationId,
Expand Down Expand Up @@ -314,41 +333,32 @@ export class MessagingService {
}

/**
* Fan a notification out to its channels and recipients. Each
* `(channel, recipient)` pair becomes one `send()` call. An unregistered
* Fan a notification out to each recipient's accepted channels. Each
* `(recipient, channel)` pair becomes one `send()` call. An unregistered
* channel, or a channel that throws, is reported as a failed delivery — it
* never aborts the rest of the fan-out.
*/
private async fanOut(
notification: Notification,
recipients: string[],
targets: PreferenceTarget[],
): Promise<{ deliveries: DeliveryOutcome[]; delivered: number; failed: number }> {
const channels = notification.channels?.length ? notification.channels : ['inbox'];
const deliveries: DeliveryOutcome[] = [];

for (const channelId of channels) {
const channel = this.channels.get(channelId);
if (!channel) {
// Surface the gap per recipient so the caller sees who missed out.
for (const recipient of recipients) {
for (const { recipient, channels } of targets) {
for (const channelId of channels) {
const channel = this.channels.get(channelId);
if (!channel) {
deliveries.push({
channel: channelId,
recipient,
ok: false,
error: `channel '${channelId}' not registered`,
});
this.ctx.logger.warn(`[messaging] emit: channel '${channelId}' not registered`);
continue;
}
this.ctx.logger.warn(`[messaging] emit: channel '${channelId}' not registered`);
continue;
}

for (const recipient of recipients) {
try {
const result = await channel.send(this.ctx, {
notification,
channel: channelId,
recipient,
});
const result = await channel.send(this.ctx, { notification, channel: channelId, recipient });
deliveries.push({
channel: channelId,
recipient,
Expand Down
2 changes: 2 additions & 0 deletions packages/services/service-messaging/src/objects/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
export { InboxMessage } from './inbox-message.object.js';
export { NotificationReceipt } from './notification-receipt.object.js';
export { NotificationDelivery } from './notification-delivery.object.js';
export { NotificationPreference } from './notification-preference.object.js';
export { NotificationSubscription } from './notification-subscription.object.js';
Loading