diff --git a/.changeset/adr-0030-notification-convergence-p0.md b/.changeset/adr-0030-notification-convergence-p0.md new file mode 100644 index 000000000..e8aee889d --- /dev/null +++ b/.changeset/adr-0030-notification-convergence-p0.md @@ -0,0 +1,57 @@ +--- +"@objectstack/service-messaging": minor +"@objectstack/platform-objects": minor +"@objectstack/plugin-audit": minor +"@objectstack/service-automation": minor +"@objectstack/metadata": minor +"@objectstack/cli": patch +"@objectstack/runtime": patch +--- + +ADR-0030 P0 (framework) — converge notifications onto a single ingress and the +layered model. Every producer now publishes through +`NotificationService.emit(EmitInput)`; the in-app inbox is a materialization of +delivery, not a row producers write. + +**Single ingress (`@objectstack/service-messaging`) — breaking** +- `MessagingService.emit` takes the new `EmitInput` contract (`topic` / + `audience` / `payload` / `severity` / `dedupKey` / `source` / `actorId` / + `organizationId` / `channels`) instead of the flat `Notification` shape. It + writes the L2 `sys_notification` event (idempotent on `dedupKey`), resolves the + audience, then fans out; it returns `{ notificationId, deduped, deliveries, + delivered, failed }`. +- New `sys_notification_receipt` object — the read-state spine + (`delivered|read|clicked|dismissed`), keyed `(notification_id, user_id, + channel)`. The inbox channel writes a `delivered` receipt on materialization. +- `sys_inbox_message`: adds `notification_id` / `delivery_id`, **drops `read`** + (read-state moved to the receipt), adds the user `mine` list view. + +**Event re-model (`@objectstack/platform-objects`) — breaking** +- `sys_notification` is re-modeled from a per-user inbox into the L2 **event** + (`topic`, `payload`, `severity`, `dedup_key`, `source_*`, `actor_id`). Removes + `recipient_id` / `is_read` / `read_at` / `type` / `title` / `body` / `url` / + `actor_name` and the inbox actions/views. App-nav: the account inbox points at + `sys_inbox_message`; Setup shows the notification event log. + +**Producers routed through `emit()`** +- `@objectstack/service-automation`: the `notify` node maps its config to + `EmitInput`. +- `@objectstack/plugin-audit`: collaboration `@mention` → `collab.mention` and + assignment → `collab.assignment` (both with a `dedupKey`); no more direct + `sys_notification` writes. Collaboration notifications now require + `MessagingServicePlugin` (they degrade to a warn otherwise). + +**Migration (`@objectstack/metadata`)** +- Idempotent `migrateSysNotificationToEvent` splits legacy `sys_notification` + inbox rows into `sys_inbox_message` + receipts and rewrites the event row. + +**Startup (`@objectstack/cli`, `@objectstack/runtime`)** +- `messaging` is now a foundational capability. On `objectstack serve` it is + added to `ALWAYS_ON_CAPABILITIES` (every non-`minimal` preset starts it); on + cloud per-project kernels the capability loader expands `requires` to add + `messaging` whenever `audit` is present. This keeps collaboration `@mention` / + assignment notifications (which now flow through the pipeline) working out of + the box on both paths. `--preset minimal` opts out. + +The Console bell repoint (objectui) and phases P1–P3 are tracked in +`docs/handoff/adr-0030-notification-convergence.md`. diff --git a/docs/handoff/adr-0030-notification-convergence.md b/docs/handoff/adr-0030-notification-convergence.md new file mode 100644 index 000000000..f31d7b4eb --- /dev/null +++ b/docs/handoff/adr-0030-notification-convergence.md @@ -0,0 +1,155 @@ +# Handoff — ADR-0030 Notification Convergence (P0 framework side) + +**ADR**: [0030 — Notification Platform Convergence](../adr/0030-notification-platform-convergence.md) +**Build spec**: [notification-platform-convergence.md](../design/notification-platform-convergence.md) +**Status of this handoff**: P0 **framework side** shipped. The **objectui** (Console bell) cut-over and phases P1–P3 remain. Date: 2026-06-01. + +--- + +## What shipped in this repo (framework) + +The single-ingress seam and the correct layered model are now in place. Every +producer goes through `NotificationService.emit(EmitInput)`; no producer writes +a per-user inbox row directly. + +### Single ingress — `MessagingService.emit(EmitInput)` +`packages/services/service-messaging/src/messaging-service.ts` +- New public contract `EmitInput` (`topic`, `audience`, `payload`, `severity`, + `dedupKey`, `source`, `actorId`, `organizationId`, `channels`). +- `emit()` now: (1) writes the **L2 `sys_notification` event** (idempotent on + `dedupKey`), (2) resolves the audience to recipients (inline for explicit + ids/emails; `role:`/`team:`/`owner_of:` are forwarded but **deferred to P1**), + (3) fans out `(channel × recipient)` deliveries. Returns + `{ notificationId, deduped, deliveries, delivered, failed }`. +- The service now takes a `getData()` so it can persist the event. + +### L2 event — `sys_notification` re-modeled (destructive) +`packages/platform-objects/src/audit/sys-notification.object.ts` +- Now the **event**: `topic`, `payload` (json), `severity`, `dedup_key`, + `source_object`, `source_id`, `actor_id`, `created_at`. Indexes on + `(topic, created_at)`, `(dedup_key)`, `(source_object, source_id)`. +- **Removed**: `recipient_id`, `is_read`, `read_at`, `type`, `title`, `body`, + `url`, `actor_name`, plus the `mark_read`/`mark_unread` actions and the + recipient-filtered list views. New admin views: `recent`, `by_topic`. + +### L5 materialization + receipt +- `sys_inbox_message` (`.../objects/inbox-message.object.ts`): added + `notification_id` + `delivery_id` FKs; **dropped `read`** (read-state lives in + the receipt now); added a `mine` list view (the user inbox). +- **New** `sys_notification_receipt` (`.../objects/notification-receipt.object.ts`): + the read-state spine, keyed `(notification_id, user_id, channel)`, state + `delivered|read|clicked|dismissed`. The inbox channel writes a `delivered` + receipt on materialization (best-effort). +- `inbox-channel.ts`: writes `notification_id` + `organization_id`, no `read` + flag, and the `delivered` receipt. Email→id fallback kept (moves up to the + `RecipientResolver` in P1). + +### Producers re-routed through `emit()` +- **Flow `notify` node** (`service-automation/.../notify-node.ts`): maps config → + `EmitInput` (title/body/url ride in `payload`). +- **Collaboration** (`plugin-audit/src/audit-writers.ts`): `@mention` → + `emit('collab.mention')`, assignment → `emit('collab.assignment')`, both with + a `dedupKey`. No more direct `sys_notification` writes. The plugin resolves the + `messaging` service lazily at hook time (`audit-plugin.ts`). + +### Data migration (not auto-run) +`packages/metadata/src/migrations/migrate-sys-notification-to-event.ts` +(exported from `@objectstack/metadata/migrations`). Splits each legacy +`sys_notification` inbox row into `sys_inbox_message` + a receipt, rewrites the +row to the event shape, and clears the legacy columns. **Idempotent**; reports +`not_applicable` on fresh installs. + +### Tests +`messaging-service`, `inbox-channel`, `messaging-service-plugin`, `notify-node`, +and the migration all have updated/added coverage. All green. + +--- + +## ⚠️ Breaking change — Console bell (objectui, separate repo) + +The bell read `sys_notification.{recipient_id, is_read, title, body, …}`. Those +fields **no longer exist**. Until objectui is updated, the bell will be empty / +error. **Do the objectui cut-over and the data migration together.** + +### objectui changes required (`app-shell`) +1. **`AppHeader.tsx` / `InboxPopover.tsx`**: poll **`sys_inbox_message`** filtered + by `user_id = {current_user}` (the `mine` list view), ordered by `created_at` + desc — instead of `sys_notification`. +2. **Read-state**: join/read `sys_notification_receipt` for the row's state + (`read` vs `delivered`). The unread badge = inbox rows with no `read`/`clicked` + receipt. +3. **Mark-read**: PATCH the **receipt** (`state: 'read'`, `at`) keyed by + `(notification_id, user_id, channel:'inbox')` — not the inbox row. (A small + REST/endpoint to upsert a receipt may be needed; see P0 follow-up below.) +4. **"View all" / notification center route**: point at `sys_inbox_message` + (`mine`) instead of `sys_notification`. +5. `RecordDetailView` and any other `sys_notification` readers: same repoint. + +### Cut-over sequence (avoid a blank bell) +1. Deploy this framework change (objects + emit + producers). New notifications + now land in `sys_inbox_message` + receipts. +2. Run `migrateSysNotificationToEvent({ driver, data })` to carry existing + notifications into `sys_inbox_message` + receipts. +3. Deploy the objectui bell repoint. + +(Step order tolerates a brief window where new rows exist but the UI hasn't +flipped — the inbox is being populated the whole time.) + +--- + +## Behavior notes / watch-outs + +- **Messaging is now foundational (auto-on).** Collaboration notifications + require the messaging pipeline (with no `messaging` service registered, + `@mention`/assignment are skipped + warned, like the `notify` node). Two seams + guarantee it loads: + - `objectstack serve`: `messaging` is in `Serve.ALWAYS_ON_CAPABILITIES` + (`packages/cli/src/commands/serve.ts`) — every non-`minimal` preset starts it. + - Cloud / per-project kernels (`capability-loader.ts`): no always-on slate, so + the loader now expands `requires` to add `messaging` whenever `audit` is + present. Artifacts requiring `audit` therefore get the pipeline automatically. + + `--preset minimal` (CLI) and artifacts that require neither `audit` nor + `messaging` opt out — collaboration notifications then no-op by design. + +- **Dedup is best-effort in P0.** `emit()` idempotency is a non-transactional + check-then-insert and `sys_notification.dedup_key` is a non-unique index, so a + concurrent duplicate `emit` with the same `dedupKey` can still produce two + events. Robust, race-safe dedup is part of the **P1 outbox** (durable spine + + unique dedup). Assignment `dedupKey`s are scoped by the record's write-version + (`updated_at`) so re-assignments aren't permanently suppressed. + +- **Event-log growth.** Every `emit()` writes one `sys_notification` event row. + High-frequency periodic `notify` flows accumulate rows unbounded; retention / + pruning is a P1+ concern (the event log is the durable audit of what was sent). +- **No mark-read write path yet — required for the objectui cut-over.** P0 added + the receipt object + `delivered` writes, but nothing transitions a receipt to + `read`/`clicked`/`dismissed`. The bell's mark-read therefore needs a small + write ingress (a receipt-upsert REST route or an `sys_inbox_message` action + keyed on `(notification_id, user_id, channel)`), landed **together with** the + objectui bell repoint. The SDK `client.notifications.markRead/list({read})` + helpers target the old `sys_notification` read-state and must be repointed to + the receipt at the same time. Until then read-state is write-less (every row + shows as unread). Decide: tail of P0 (with objectui) vs P1. +- **Translations**: `packages/platform-objects/src/apps/translations/*.generated.ts` + still carry the old `sys_notification` field labels (`is_read`, etc.). Harmless + (unused) but should be regenerated. +- **Audience selectors** `role:`/`team:`/`owner_of:` are accepted by `emit()` but + not yet expanded — they resolve to zero recipients until the P1 + `RecipientResolver`. Today's producers only pass explicit ids/emails, so this is + latent, not active. + +--- + +## Remaining phases (from the build spec) + +- **P1 — Reliable delivery**: `sys_notification_delivery` outbox + dispatcher + (state machine, retry/backoff, dead-letter, `dedup_key`); `RecipientResolver` + (reuse sharing/CEL resolver) owning `role:`/`owner_of:`/`team:`/email→id. Move + the inbox channel's email→id fallback up here. +- **P2 — Subscription + preference**: `sys_notification_subscription` + + `sys_notification_preference` objects + Studio config UI; mandatory-topic + bypass; admin-global + per-user-override defaults. +- **P3 — Channels + templates + digest**: email/push/webhook/Slack channels on + connectors (ADR-0022); `sys_notification_template` (topic×channel×locale) + + renderer; digest / quiet-hours middleware. diff --git a/packages/cli/src/commands/serve.ts b/packages/cli/src/commands/serve.ts index 705d2a697..9398efb1a 100644 --- a/packages/cli/src/commands/serve.ts +++ b/packages/cli/src/commands/serve.ts @@ -163,8 +163,14 @@ export default class Serve extends Command { * Capabilities auto-added to every app's `requires` for every preset * EXCEPT `minimal`. These form the foundation that every server-side * runtime expects to exist (background work, settings persistence, - * transactional mail, file uploads). Apps may still list these in - * `requires:` explicitly — duplicates are de-duped. + * transactional mail, file uploads, notifications). Apps may still list + * these in `requires:` explicitly — duplicates are de-duped. + * + * `messaging` is foundational because, post-ADR-0030, notifications flow + * through a single ingress (`NotificationService.emit`): collaboration + * `@mention` / assignment (plugin-audit) and the `notify` flow node deliver + * via the messaging pipeline, and the Console bell reads its materialization + * (`sys_inbox_message`). Without it those notifications silently no-op. * * Opt out: `objectstack serve --preset minimal`. * @@ -172,7 +178,7 @@ export default class Serve extends Command { * mirror this list on their per-project kernels. */ static readonly ALWAYS_ON_CAPABILITIES: readonly string[] = Object.freeze([ - 'queue', 'job', 'cache', 'settings', 'email', 'storage', 'sharing', + 'queue', 'job', 'cache', 'settings', 'email', 'storage', 'sharing', 'messaging', ]); /** diff --git a/packages/metadata/src/migrations/index.ts b/packages/metadata/src/migrations/index.ts index 052a6e169..d007799cc 100644 --- a/packages/metadata/src/migrations/index.ts +++ b/packages/metadata/src/migrations/index.ts @@ -16,3 +16,8 @@ export { addSysMetadataOverlayIndex, type AddSysMetadataOverlayIndexResult, } from './add-sys-metadata-overlay-index.js'; +export { + migrateSysNotificationToEvent, + type SysNotificationMigrationResult, + type SysNotificationMigrationOptions, +} from './migrate-sys-notification-to-event.js'; diff --git a/packages/metadata/src/migrations/migrate-sys-notification-to-event.test.ts b/packages/metadata/src/migrations/migrate-sys-notification-to-event.test.ts new file mode 100644 index 000000000..93871a462 --- /dev/null +++ b/packages/metadata/src/migrations/migrate-sys-notification-to-event.test.ts @@ -0,0 +1,140 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { migrateSysNotificationToEvent } from './migrate-sys-notification-to-event.js'; + +/** Columns the legacy (pre-ADR-0030) sys_notification table physically has. */ +const LEGACY_TABLE_COLUMNS = [ + 'id', 'recipient_id', 'type', 'title', 'body', 'url', 'actor_name', + 'is_read', 'read_at', 'created_at', 'organization_id', 'topic', 'payload', 'severity', +]; + +function fakeDriver(rows: any[], columns: string[] = LEGACY_TABLE_COLUMNS) { + const updates: Array<{ sql: string; bindings: any[] }> = []; + return { + updates, + driver: { + async raw(sql: string, bindings: any[] = []) { + if (sql.startsWith('PRAGMA table_info')) { + return columns.map((name) => ({ name })); + } + if (sql.startsWith('SELECT id, recipient_id')) { + return rows; + } + if (sql.startsWith('UPDATE')) { + updates.push({ sql, bindings }); + return []; + } + return []; + }, + } as any, + }; +} + +function fakeEngine() { + const inserts: Array<{ object: string; row: any }> = []; + const updates: Array<{ object: string; data: any }> = []; + return { + inserts, + updates, + engine: { + async insert(object: string, row: any) { + inserts.push({ object, row }); + return { id: `${object}_${inserts.length}`, ...row }; + }, + async update(object: string, data: any) { + updates.push({ object, data }); + return data; + }, + async find() { return []; }, + async findOne() { return null; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + } as any, + }; +} + +describe('migrateSysNotificationToEvent', () => { + it('splits each legacy row into inbox + receipt and rewrites the event', async () => { + const d = fakeDriver([ + { id: 'n1', recipient_id: 'u1', type: 'mention', title: 'You were mentioned', body: 'hi', url: '/x', actor_name: 'Ada', is_read: 0, read_at: null, created_at: '2026-01-01T00:00:00.000Z', organization_id: 'org_1' }, + { id: 'n2', recipient_id: 'u2', type: 'assignment', title: 'Assigned', body: null, url: null, actor_name: null, is_read: 1, read_at: '2026-02-02T00:00:00.000Z', created_at: '2026-02-01T00:00:00.000Z', organization_id: 'org_1' }, + ]); + const e = fakeEngine(); + + const result = await migrateSysNotificationToEvent({ driver: d.driver, data: e.engine }); + + expect(result.status).toBe('migrated'); + expect(result.migrated).toBe(2); + + const inbox = e.inserts.filter((i) => i.object === 'sys_inbox_message'); + const receipts = e.inserts.filter((i) => i.object === 'sys_notification_receipt'); + expect(inbox).toHaveLength(2); + expect(receipts).toHaveLength(2); + + // Row 1: unread → delivered receipt; inbox keyed by recipient, linked to event. + expect(inbox[0].row).toMatchObject({ user_id: 'u1', notification_id: 'n1', title: 'You were mentioned', action_url: '/x', organization_id: 'org_1' }); + expect(receipts[0].row).toMatchObject({ notification_id: 'n1', user_id: 'u1', channel: 'inbox', state: 'delivered' }); + + // Row 2: read → read receipt carrying read_at. + expect(receipts[1].row).toMatchObject({ notification_id: 'n2', user_id: 'u2', state: 'read', at: '2026-02-02T00:00:00.000Z' }); + + // The event row is rewritten (topic ← type, payload built) and legacy columns nulled. + const ev = e.updates.filter((u) => u.object === 'sys_notification'); + expect(ev[0].data).toMatchObject({ id: 'n1', topic: 'mention', payload: { title: 'You were mentioned', url: '/x', actorName: 'Ada' } }); + expect(d.updates).toHaveLength(2); + expect(d.updates[0].sql).toContain('"recipient_id" = NULL'); + expect(d.updates[0].bindings).toEqual(['n1']); + }); + + it('works on a Postgres-style driver where PRAGMA throws (information_schema fallback)', async () => { + // PRAGMA raises a syntax error on Postgres; columnExists must fall + // through to information_schema rather than reporting not_applicable. + const rows = [ + { id: 'n1', recipient_id: 'u1', type: 'mention', title: 'hi', body: null, url: null, actor_name: null, is_read: false, read_at: null, created_at: '2026-01-01T00:00:00.000Z', organization_id: 'org_1' }, + ]; + const updates: Array<{ sql: string; bindings: any[] }> = []; + const pgDriver = { + async raw(sql: string, bindings: any[] = []) { + if (sql.startsWith('PRAGMA')) throw new Error('syntax error at or near "PRAGMA"'); + if (sql.includes('information_schema')) { + // bindings = [table, column]; report the column as present. + return [{ column_name: bindings[1] }]; + } + if (sql.startsWith('SELECT id, recipient_id')) return rows; + if (sql.startsWith('UPDATE')) { updates.push({ sql, bindings }); return []; } + return []; + }, + } as any; + const e = fakeEngine(); + + const result = await migrateSysNotificationToEvent({ driver: pgDriver, data: e.engine }); + + expect(result.status).toBe('migrated'); + expect(result.migrated).toBe(1); + expect(e.inserts.map((i) => i.object)).toEqual(['sys_inbox_message', 'sys_notification_receipt']); + }); + + it('is idempotent — no legacy rows means already_done', async () => { + const d = fakeDriver([]); + const e = fakeEngine(); + const result = await migrateSysNotificationToEvent({ driver: d.driver, data: e.engine }); + expect(result.status).toBe('already_done'); + expect(e.inserts).toHaveLength(0); + }); + + it('reports not_applicable when the table never had a recipient_id column', async () => { + const d = fakeDriver([], ['id', 'topic', 'payload', 'severity', 'created_at']); + const e = fakeEngine(); + const result = await migrateSysNotificationToEvent({ driver: d.driver, data: e.engine }); + expect(result.status).toBe('not_applicable'); + }); + + it('errors cleanly when the driver has no raw()', async () => { + const e = fakeEngine(); + const result = await migrateSysNotificationToEvent({ driver: {} as any, data: e.engine }); + expect(result.status).toBe('error'); + expect(result.error).toContain('.raw'); + }); +}); diff --git a/packages/metadata/src/migrations/migrate-sys-notification-to-event.ts b/packages/metadata/src/migrations/migrate-sys-notification-to-event.ts new file mode 100644 index 000000000..64630bf70 --- /dev/null +++ b/packages/metadata/src/migrations/migrate-sys-notification-to-event.ts @@ -0,0 +1,214 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * Migration: sys_notification (per-user inbox) → notification event (ADR-0030) + * + * ADR-0030 re-models `sys_notification` from a per-user *inbox* into the L2 + * *event* (one row per `emit`). This migration preserves users' existing bell + * notifications across the cut-over by splitting each legacy row into the new + * layered model: + * + * legacy sys_notification row (recipient_id, type, title, body, url, + * actor_name, is_read, read_at, …) + * │ + * ├─► sys_inbox_message (L5 in-app materialization, keyed by user) + * ├─► sys_notification_receipt (L5 read-state: 'read' if is_read else 'delivered') + * └─► the sys_notification row itself is rewritten to the event shape + * (topic ← type, payload ← {title,body,url,actor_name}) and its legacy + * inbox columns are cleared. + * + * Idempotent: it acts only on rows that still carry the legacy shape + * (`recipient_id IS NOT NULL`); a second run is a no-op. Safe when the legacy + * columns were never present (a fresh install created directly in the new + * shape) — it reports `not_applicable`. + * + * Usage: + * import { migrateSysNotificationToEvent } from '@objectstack/metadata/migrations'; + * await migrateSysNotificationToEvent({ driver, data }); + * + * `driver` provides raw access to read legacy columns the re-modeled schema no + * longer projects and to clear them; `data` (IDataEngine) performs the + * structured inbox/receipt writes and the event rewrite so ids, JSON fields and + * tenant stamping are handled uniformly across drivers. + */ + +import type { IDataDriver, IDataEngine } from '@objectstack/spec/contracts'; + +const EVENT_OBJECT = 'sys_notification'; +const INBOX_OBJECT = 'sys_inbox_message'; +const RECEIPT_OBJECT = 'sys_notification_receipt'; + +/** Legacy inbox columns cleared once a row is rewritten to the event shape. */ +const LEGACY_COLUMNS = [ + 'recipient_id', + 'type', + 'title', + 'body', + 'url', + 'actor_name', + 'is_read', + 'read_at', +] as const; + +export interface SysNotificationMigrationResult { + status: 'migrated' | 'already_done' | 'not_applicable' | 'error'; + /** Number of legacy rows split into inbox + receipt + event. */ + migrated: number; + error?: string; +} + +export interface SysNotificationMigrationOptions { + driver: IDataDriver; + data: IDataEngine; + /** Defaults to `() => new Date().toISOString()`. */ + now?(): string; +} + +export async function migrateSysNotificationToEvent( + opts: SysNotificationMigrationOptions, +): Promise { + const driver = opts.driver as any; + const { data } = opts; + const now = opts.now ?? (() => new Date().toISOString()); + + if (typeof driver?.raw !== 'function') { + return { + status: 'error', + migrated: 0, + error: 'migrateSysNotificationToEvent: driver must expose a .raw(sql, bindings?) method.', + }; + } + + // No legacy `recipient_id` column → the table never held the inbox shape. + if (!(await columnExists(driver, EVENT_OBJECT, 'recipient_id'))) { + return { status: 'not_applicable', migrated: 0 }; + } + + // Only null-out columns that actually exist on this deployment. + const presentLegacy: string[] = []; + for (const col of LEGACY_COLUMNS) { + if (await columnExists(driver, EVENT_OBJECT, col)) presentLegacy.push(col); + } + + let migrated = 0; + try { + const rows = await selectLegacyRows(driver); + if (rows.length === 0) return { status: 'already_done', migrated: 0 }; + + for (const row of rows) { + const id = String(row.id); + const recipientId = row.recipient_id != null ? String(row.recipient_id) : null; + if (!recipientId) continue; // defensive — guarded by the SELECT filter + const orgId = row.organization_id != null ? String(row.organization_id) : null; + const createdAt = row.created_at != null ? String(row.created_at) : now(); + const title = row.title != null ? String(row.title) : (row.type != null ? String(row.type) : 'Notification'); + const isRead = row.is_read === true || row.is_read === 1 || row.is_read === '1'; + // One topic for both the inbox row and the rewritten event, so the + // materialization and its L2 event never disagree (empty/null legacy + // `type` → 'legacy'). + const eventTopic = row.type != null && String(row.type).length > 0 ? String(row.type) : 'legacy'; + + // L5 in-app materialization. + await data.insert(INBOX_OBJECT, { + user_id: recipientId, + notification_id: id, + topic: eventTopic, + title, + body_md: row.body ?? null, + severity: 'info', + action_url: row.url ?? null, + organization_id: orgId, + created_at: createdAt, + }); + + // L5 receipt (read-state spine). + await data.insert(RECEIPT_OBJECT, { + notification_id: id, + delivery_id: null, + user_id: recipientId, + channel: 'inbox', + state: isRead ? 'read' : 'delivered', + at: isRead && row.read_at != null ? String(row.read_at) : createdAt, + organization_id: orgId, + created_at: createdAt, + }); + + // Rewrite the row itself to the L2 event shape (engine handles JSON). + await data.update( + EVENT_OBJECT, + { + id, + topic: eventTopic, + severity: 'info', + payload: { + title: row.title ?? null, + body: row.body ?? null, + url: row.url ?? null, + actorName: row.actor_name ?? null, + }, + }, + { where: { id } }, + ); + + // Clear the legacy inbox columns so the row no longer matches the + // migration filter (idempotency) and carries no stale recipient. + if (presentLegacy.length > 0) { + const setClause = presentLegacy.map((c) => `"${c}" = NULL`).join(', '); + await driver.raw(`UPDATE "${EVENT_OBJECT}" SET ${setClause} WHERE id = ?`, [id]); + } + + migrated += 1; + } + + return { status: 'migrated', migrated }; + } catch (err: any) { + return { status: 'error', migrated, error: err?.message ?? String(err) }; + } +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +async function selectLegacyRows(driver: any): Promise { + const result: any[] = await driver.raw( + `SELECT id, recipient_id, type, title, body, url, actor_name, is_read, read_at, created_at, organization_id ` + + `FROM "${EVENT_OBJECT}" WHERE recipient_id IS NOT NULL`, + ); + // knex wraps some results as `[rows]`; normalize both shapes. + if (Array.isArray(result) && result.length > 0 && Array.isArray(result[0])) { + return result[0]; + } + return Array.isArray(result) ? result : []; +} + +async function columnExists(driver: any, table: string, column: string): Promise { + // SQLite path: PRAGMA table_info. On Postgres/others this raises a syntax + // error — swallow it *locally* and fall through to information_schema (the + // outer-catch version of this would never reach the fallback, making the + // migration silently no-op on every non-SQLite DB). + try { + const rows: any = await driver.raw(`PRAGMA table_info("${table}")`); + const list: any[] = Array.isArray(rows) + ? (Array.isArray(rows[0]) ? rows[0] : rows) + : []; + if (list.length > 0 && list.some((r: any) => r?.name != null)) { + return list.some((r: any) => r?.name === column); + } + } catch { + /* not SQLite — fall through to information_schema */ + } + // Postgres / others. + try { + const result: any = await driver.raw( + `SELECT column_name FROM information_schema.columns WHERE table_name = ? AND column_name = ?`, + [table, column], + ); + const list: any[] = Array.isArray(result) + ? (Array.isArray(result[0]) ? result[0] : result) + : []; + return list.length > 0; + } catch { + return false; + } +} diff --git a/packages/platform-objects/src/apps/account.app.ts b/packages/platform-objects/src/apps/account.app.ts index 0b2a1d343..2fcbec7ad 100644 --- a/packages/platform-objects/src/apps/account.app.ts +++ b/packages/platform-objects/src/apps/account.app.ts @@ -73,13 +73,15 @@ export const ACCOUNT_APP: App = { defaultOpen: true, children: [ { + // ADR-0030: the user-facing inbox is the materialization + // (sys_inbox_message), not the L2 event (sys_notification). id: 'nav_account_notifications', type: 'object', label: 'Notifications', - objectName: 'sys_notification', - viewName: 'unread', + objectName: 'sys_inbox_message', + viewName: 'mine', icon: 'bell', - requiresObject: 'sys_notification', + requiresObject: 'sys_inbox_message', }, { id: 'nav_account_approvals', diff --git a/packages/platform-objects/src/apps/setup.app.ts b/packages/platform-objects/src/apps/setup.app.ts index a16db6b75..afb11d1a0 100644 --- a/packages/platform-objects/src/apps/setup.app.ts +++ b/packages/platform-objects/src/apps/setup.app.ts @@ -187,7 +187,7 @@ export const SETUP_APP: App = { // record pages, not platform admin surfaces. { id: 'nav_sessions', type: 'object', label: 'Sessions', objectName: 'sys_session', icon: 'monitor' }, { id: 'nav_audit_logs', type: 'object', label: 'Audit Logs', objectName: 'sys_audit_log', icon: 'scroll-text' }, - { id: 'nav_notifications', type: 'object', label: 'Notifications', objectName: 'sys_notification', icon: 'bell', requiresObject: 'sys_notification' }, + { id: 'nav_notifications', type: 'object', label: 'Notification Events', objectName: 'sys_notification', viewName: 'recent', icon: 'bell', requiresObject: 'sys_notification' }, ], }, { diff --git a/packages/platform-objects/src/audit/sys-notification.object.ts b/packages/platform-objects/src/audit/sys-notification.object.ts index e8a3d26ca..8495d45ba 100644 --- a/packages/platform-objects/src/audit/sys-notification.object.ts +++ b/packages/platform-objects/src/audit/sys-notification.object.ts @@ -3,113 +3,60 @@ import { ObjectSchema, Field } from '@objectstack/spec/data'; /** - * sys_notification — Per-User Inbox Notification + * sys_notification — Notification Event (ADR-0030 Layer 2) * - * Personal, unread-trackable notifications. Distinct from - * `sys_activity` (per-record, append-only narrative) and - * `sys_audit_log` (compliance-grade structured diff). Each row - * targets exactly one user (`recipient_id`) and is the source of - * truth for the header bell badge. + * **Re-modeled (ADR-0030)**: this object was previously a per-user inbox. It is + * now the platform's notification **event** — one row per `emit()`, the single + * ingress through which every producer (the flow `notify` node, collaboration + * `@mention`, record assignment, system alerts) publishes. It carries no + * recipient and no read-state: those live downstream in the delivery + * materialization (`sys_inbox_message`) and the receipt + * (`sys_notification_receipt`). * - * Typical writers: comment mention, record assignment, lead-convert - * completion, flow notifications. Typical readers: header bell, - * notification center. + * Layering (ADR-0012 / ADR-0030): + * L2 event → this object (`topic` / `payload` / `dedup_key` / `severity`) + * L4 delivery → `sys_notification_delivery` (outbox; P1) + * L5 materialize → `sys_inbox_message` (in-app), email/push/… per channel + * L5 receipt → `sys_notification_receipt` (read/clicked/dismissed) + * + * Writers: `NotificationService.emit()` only — **no producer writes this row + * directly** (single-ingress rule). Readers: the delivery pipeline; the admin + * notification-event log. The Console bell reads `sys_inbox_message`, not this. * * @namespace sys */ export const SysNotification = ObjectSchema.create({ name: 'sys_notification', - label: 'Notification', - pluralLabel: 'Notifications', + label: 'Notification Event', + pluralLabel: 'Notification Events', icon: 'bell', isSystem: true, managedBy: 'system', - description: 'Per-user notification inbox entries', - displayNameField: 'title', - titleFormat: '{title}', - compactLayout: ['title', 'type', 'is_read', 'created_at'], - - /** - * Row-level inbox actions. Use `visible` CEL expressions to ensure - * `mark_read` only shows on unread rows and vice-versa, mirroring the - * mark-as-read affordances in GitHub / Linear inboxes. The toolbar-level - * `mark_all_read` is intentionally omitted server-side: it requires a - * bulk update primitive that doesn't yet exist on the REST surface, and - * the popover already handles the multi-row case client-side via N - * single-row PATCHes (see `InboxPopover.tsx` -> AppHeader `markAllRead`). - */ - actions: [ - { - name: 'mark_read', - label: 'Mark as Read', - icon: 'check', - variant: 'secondary', - mode: 'custom', - locations: ['list_item'], - type: 'api', - method: 'PATCH', - target: '/api/v1/data/sys_notification/{id}', - bodyExtra: { is_read: true }, - visible: '!record.is_read', - successMessage: 'Notification marked as read', - refreshAfter: true, - }, - { - name: 'mark_unread', - label: 'Mark as Unread', - icon: 'bell-dot', - variant: 'secondary', - mode: 'custom', - locations: ['list_item'], - type: 'api', - method: 'PATCH', - target: '/api/v1/data/sys_notification/{id}', - bodyExtra: { is_read: false, read_at: null }, - visible: 'record.is_read', - successMessage: 'Notification marked as unread', - refreshAfter: true, - }, - ], + description: 'Notification events — one row per emit() (ADR-0030 Layer 2 ingress)', + displayNameField: 'topic', + titleFormat: '{topic}', + compactLayout: ['topic', 'severity', 'source_object', 'created_at'], listViews: { - unread: { + recent: { type: 'grid', - name: 'unread', - label: 'Unread', + name: 'recent', + label: 'Recent', data: { provider: 'object', object: 'sys_notification' }, - // Title + actor first (the "who/what" the user actually scans); - // type stays as a categorising chip; created_at right-aligned. - columns: ['title', 'actor_name', 'type', 'created_at'], - filter: [ - { field: 'recipient_id', operator: 'equals', value: '{current_user_id}' }, - { field: 'is_read', operator: 'equals', value: false }, - ], + columns: ['topic', 'severity', 'actor_id', 'source_object', 'created_at'], sort: [{ field: 'created_at', order: 'desc' }], pagination: { pageSize: 50 }, - emptyState: { title: 'Inbox zero', message: 'No unread notifications.' }, + emptyState: { title: 'No events', message: 'No notification events have been emitted.' }, }, - mine: { + by_topic: { type: 'grid', - name: 'mine', - label: 'Mine', + name: 'by_topic', + label: 'By Topic', data: { provider: 'object', object: 'sys_notification' }, - columns: ['title', 'actor_name', 'type', 'is_read', 'created_at'], - filter: [{ field: 'recipient_id', operator: 'equals', value: '{current_user_id}' }], - sort: [{ field: 'created_at', order: 'desc' }], - pagination: { pageSize: 50 }, - // Group by notification category so mention/assignment storms don't - // hide system or task_due rows. Users still toggle to flat via the - // toolbar Group control if they prefer chronology only. - grouping: { fields: [{ field: 'type', order: 'asc', collapsed: false }] }, - }, - all_notifications: { - type: 'grid', - name: 'all_notifications', - label: 'All', - data: { provider: 'object', object: 'sys_notification' }, - columns: ['title', 'recipient_id', 'actor_name', 'type', 'is_read', 'created_at'], + columns: ['topic', 'severity', 'source_object', 'source_id', 'created_at'], sort: [{ field: 'created_at', order: 'desc' }], pagination: { pageSize: 100 }, + grouping: { fields: [{ field: 'topic', order: 'asc', collapsed: false }] }, }, }, @@ -121,40 +68,37 @@ export const SysNotification = ObjectSchema.create({ group: 'System', }), - // ── Routing ────────────────────────────────────────────────── - recipient_id: Field.lookup('sys_user', { - label: 'Recipient', + // ── Event identity ─────────────────────────────────────────── + topic: Field.text({ + label: 'Topic', required: true, + maxLength: 200, searchable: true, - description: 'User the notification is delivered to', - group: 'Routing', + description: 'Notification topic, e.g. task.assigned, collab.mention', + group: 'Event', }), - // ── Content ────────────────────────────────────────────────── - type: Field.select( - ['mention', 'assignment', 'comment_reply', 'lead_converted', 'task_due', 'system'], - { - label: 'Type', - required: true, - defaultValue: 'system', - description: 'Notification category — drives icon + sort priority', - group: 'Content', - }, - ), + payload: Field.json({ + label: 'Payload', + required: false, + description: 'Template inputs carried to channels (title/body/url/actor/source/…)', + group: 'Event', + }), - title: Field.text({ - label: 'Title', - required: true, - maxLength: 255, - searchable: true, - group: 'Content', + severity: Field.select(['info', 'warning', 'critical'], { + label: 'Severity', + required: false, + defaultValue: 'info', + description: 'Severity hint for rendering / filtering', + group: 'Event', }), - body: Field.textarea({ - label: 'Body', + dedup_key: Field.text({ + label: 'Dedup Key', required: false, - description: 'Optional secondary text (one-line summary)', - group: 'Content', + maxLength: 255, + description: 'Idempotency key within a topic window; a repeat emit is a no-op', + group: 'Event', }), // ── Source linkage ─────────────────────────────────────────── @@ -174,40 +118,13 @@ export const SysNotification = ObjectSchema.create({ group: 'Source', }), - url: Field.url({ - label: 'Deep Link', - required: false, - description: 'Optional URL to navigate to when clicked', - group: 'Source', - }), - actor_id: Field.lookup('sys_user', { label: 'Actor', required: false, - description: 'User who caused the notification (mentioner, assigner)', + description: 'User who caused the event (mentioner, assigner)', group: 'Source', }), - actor_name: Field.text({ - label: 'Actor Name', - required: false, - group: 'Source', - }), - - // ── Read state ─────────────────────────────────────────────── - is_read: Field.boolean({ - label: 'Read', - defaultValue: false, - description: 'True once recipient acknowledges', - group: 'State', - }), - - read_at: Field.datetime({ - label: 'Read At', - required: false, - group: 'State', - }), - // ── Lifecycle ──────────────────────────────────────────────── created_at: Field.datetime({ label: 'Created At', @@ -216,17 +133,11 @@ export const SysNotification = ObjectSchema.create({ readonly: true, group: 'System', }), - - updated_at: Field.datetime({ - label: 'Updated At', - required: false, - group: 'System', - }), }, indexes: [ - { fields: ['recipient_id', 'is_read', 'created_at'] }, - { fields: ['recipient_id', 'created_at'] }, + { fields: ['topic', 'created_at'] }, + { fields: ['dedup_key'] }, { fields: ['source_object', 'source_id'] }, ], }); diff --git a/packages/plugins/plugin-audit/src/audit-plugin.ts b/packages/plugins/plugin-audit/src/audit-plugin.ts index 25963e516..be66b5c6c 100644 --- a/packages/plugins/plugin-audit/src/audit-plugin.ts +++ b/packages/plugins/plugin-audit/src/audit-plugin.ts @@ -3,7 +3,7 @@ import type { Plugin, PluginContext } from '@objectstack/core'; import type { IDataEngine } from '@objectstack/spec/contracts'; import { SysAuditLog, SysActivity, SysComment, SysAttachment, SysNotification } from '@objectstack/platform-objects/audit'; -import { installAuditWriters } from './audit-writers.js'; +import { installAuditWriters, type MessagingEmitSurface } from './audit-writers.js'; /** * AuditPlugin @@ -59,7 +59,18 @@ export class AuditPlugin implements Plugin { ctx.logger.warn('AuditPlugin: ObjectQL engine not available — audit writers NOT installed'); return; } - installAuditWriters(engine as any, this.name); + // Resolve the messaging service lazily at hook time so collaboration + // @mention / assignment notifications go through the ADR-0030 single + // ingress (emit) instead of writing sys_notification directly. Messaging + // may register after audit; lazy resolution tolerates either order. + const getMessaging = (): MessagingEmitSurface | undefined => { + try { + return ctx.getService('messaging'); + } catch { + return undefined; + } + }; + installAuditWriters(engine as any, this.name, { getMessaging }); process.stderr.write('[AuditPlugin] writers installed\n'); ctx.logger.info('AuditPlugin: audit + activity writers installed'); }); diff --git a/packages/plugins/plugin-audit/src/audit-writers.ts b/packages/plugins/plugin-audit/src/audit-writers.ts index da026e75c..ab95e3661 100644 --- a/packages/plugins/plugin-audit/src/audit-writers.ts +++ b/packages/plugins/plugin-audit/src/audit-writers.ts @@ -3,6 +3,37 @@ import type { HookContext } from '@objectstack/spec/data'; import type { IDataEngine } from '@objectstack/spec/contracts'; +/** + * Minimal structural view of `NotificationService.emit` (ADR-0030). Declared + * locally so plugin-audit takes no runtime dependency on service-messaging — it + * resolves whatever object is registered under the `messaging` service at hook + * time and routes collaboration notifications through the single ingress. + */ +export interface MessagingEmitSurface { + emit(input: { + topic: string; + audience: string[]; + payload?: Record; + severity?: 'info' | 'warning' | 'critical'; + dedupKey?: string; + source?: { object: string; id: string }; + actorId?: string; + organizationId?: string; + }): Promise; +} + +/** Options for {@link installAuditWriters}. */ +export interface AuditWriterOptions { + /** + * Lazily resolve the messaging service so collaboration `@mention` / + * assignment notifications go through the ADR-0030 single ingress rather than + * writing `sys_notification` directly. Returns `undefined` when messaging is + * not installed — those notifications are then skipped (no pipeline, no bell), + * matching the `notify` node's degradation. + */ + getMessaging?(): MessagingEmitSurface | undefined; +} + /** * Audit writer hook installer. * @@ -102,9 +133,15 @@ function safeStringify(v: any): string { * `packageId` — calling twice with the same id replaces the previous * registration. */ -export function installAuditWriters(engine: any, packageId = 'com.objectstack.audit'): void { +export function installAuditWriters( + engine: any, + packageId = 'com.objectstack.audit', + opts: AuditWriterOptions = {}, +): void { if (!engine || typeof engine.registerHook !== 'function') return; + const getMessaging = opts.getMessaging ?? (() => undefined); + // Remove any prior installation so we can safely re-install on hot reload. if (typeof engine.unregisterHooksByPackage === 'function') { engine.unregisterHooksByPackage(packageId); @@ -252,17 +289,16 @@ export function installAuditWriters(engine: any, packageId = 'com.objectstack.au const sys = api.sudo(); await sys.object('sys_audit_log').create(auditRow); await sys.object('sys_activity').create(activityRow); - // M10.8: write per-user inbox notifications. Best-effort; never - // throws into the user-facing CRUD path. Covers two common cases: + // M10.8 / ADR-0030: notify the assignee. Best-effort; never throws into + // the user-facing CRUD path. Goes through the messaging single ingress + // (`emit`) — the inbox channel materializes the bell row — rather than + // writing `sys_notification` directly. If owner_id / assigned_to was + // newly set (or changed to a different user) on a non-system record, the + // recipient sees "Lead X was assigned to you" without polling. // - // 1. Assignment — if owner_id / assigned_to was newly set (or - // changed to a different user) on a non-system record, drop - // a notification into the recipient's inbox so they can see - // "Lead X was assigned to you" without polling the record. - // - // 2. (Comment mentions are handled separately by the sys_comment - // hook below since SKIP_OBJECTS excludes it from this writer.) - await writeAssignmentNotifications(sys, { + // (Comment mentions are handled separately by the sys_comment hook below + // since SKIP_OBJECTS excludes it from this writer.) + await writeAssignmentNotifications(getMessaging(), { object: ctx.object, recordId: recordId ?? null, label, @@ -292,8 +328,8 @@ export function installAuditWriters(engine: any, packageId = 'com.objectstack.au const writeCommentMentions = async (ctx: HookContext) => { if (ctx.object !== 'sys_comment') return; if (ctx.event !== 'afterInsert') return; - const api: any = (ctx as any).api; - if (!api?.sudo) return; + const messaging = getMessaging(); + if (!messaging) return; // no pipeline installed → no mention notifications const row: any = ctx.result; if (!row || typeof row !== 'object') return; @@ -317,26 +353,29 @@ export function installAuditWriters(engine: any, packageId = 'com.objectstack.au const bodyPreview = String(row.body ?? '').slice(0, 240); const sess: any = (ctx as any).session ?? {}; const tenantId: string | null = sess.tenantId ?? row.organization_id ?? null; + const commentId = row.id != null ? String(row.id) : null; - const sys = api.sudo(); for (const uid of userIds) { if (uid === actorId) continue; // don't notify the mention author try { - await sys.object('sys_notification').create({ - recipient_id: uid, - type: 'mention', - title: actorName ? `${actorName} mentioned you` : 'You were mentioned', - body: bodyPreview, - source_object: source_object || null, - source_id: source_id || null, - actor_id: actorId, - actor_name: actorName, - is_read: false, - // Stamp tenant so the recipient's RLS sees it (see writeAssignmentNotifications). - organization_id: tenantId, + // ADR-0030 single ingress — emit() writes the L2 event and the inbox + // channel materializes the bell row + a delivered receipt. + await messaging.emit({ + topic: 'collab.mention', + audience: [uid], + severity: 'info', + source: source_object ? { object: source_object, id: source_id ?? '' } : undefined, + actorId: actorId ?? undefined, + organizationId: tenantId ?? undefined, + dedupKey: commentId ? `collab.mention:${commentId}:${uid}` : undefined, + payload: { + title: actorName ? `${actorName} mentioned you` : 'You were mentioned', + body: bodyPreview, + actorName, + }, }); } catch (err) { - try { (engine as any).logger?.warn?.('Mention notification write failed', { uid, err: String((err as any)?.message ?? err) }); } catch {} + try { (engine as any).logger?.warn?.('Mention notification emit failed', { uid, err: String((err as any)?.message ?? err) }); } catch {} } } }; @@ -360,7 +399,7 @@ function pickOwner(rec: any): string | null { } async function writeAssignmentNotifications( - sys: any, + messaging: MessagingEmitSurface | undefined, params: { object: string; recordId: string | null; @@ -372,6 +411,7 @@ async function writeAssignmentNotifications( tenantId: string | null; }, ): Promise { + if (!messaging) return; // no pipeline installed → no assignment notifications if (params.action === 'delete') return; if (!params.recordId) return; @@ -382,21 +422,32 @@ async function writeAssignmentNotifications( if (newOwner === params.actorId) return; // self-assignment is silent try { - await sys.object('sys_notification').create({ - recipient_id: newOwner, - type: 'assignment', - title: `${params.object} "${params.label}" assigned to you`, - body: null, - source_object: params.object, - source_id: params.recordId, - actor_id: params.actorId, - actor_name: null, - is_read: false, - // Stamp organization_id so the recipient (who lives in the same - // tenant as the action) sees the notification through RLS. Without - // this, sys_notification rows insert with NULL organization_id and - // the recipient's `tenant_isolation` policy denies them. - organization_id: params.tenantId, + // ADR-0030 single ingress — emit() writes the L2 event and the inbox + // channel materializes the bell row + a delivered receipt. organizationId + // is propagated so the recipient (same tenant as the action) sees the + // materialized row through RLS. + // Dedup only a true double-fire of the SAME write: scope the key by the + // record's write-version (updated_at). Without a version component the key + // would be permanent and a legitimate re-assignment back to a prior owner + // would be silently suppressed. When no version field exists, omit the key + // (every assignment notifies — same as the pre-ADR-0030 direct-write path). + const writeVersion = + (params.after && typeof params.after === 'object' + ? params.after.updated_at ?? params.after.modified_at ?? params.after.updated_date + : null) ?? null; + await messaging.emit({ + topic: 'collab.assignment', + audience: [newOwner], + severity: 'info', + source: { object: params.object, id: params.recordId }, + actorId: params.actorId ?? undefined, + organizationId: params.tenantId ?? undefined, + dedupKey: writeVersion + ? `collab.assignment:${params.object}:${params.recordId}:${newOwner}:${writeVersion}` + : undefined, + payload: { + title: `${params.object} "${params.label}" assigned to you`, + }, }); } catch { // best-effort; never throw into CRUD path diff --git a/packages/runtime/src/cloud/capability-loader.ts b/packages/runtime/src/cloud/capability-loader.ts index 4927d8b47..a61e71f10 100644 --- a/packages/runtime/src/cloud/capability-loader.ts +++ b/packages/runtime/src/cloud/capability-loader.ts @@ -132,7 +132,18 @@ export async function loadCapabilities(opts: LoadCapabilitiesOptions): Promise { expect(messaging.emitted).toHaveLength(1); expect(messaging.emitted[0]).toMatchObject({ topic: 'deal.won', - title: 'Deal Acme closed', - body: 'Congrats on Acme', - recipients: ['user_1', 'user_2'], + audience: ['user_1', 'user_2'], channels: ['inbox', 'email'], severity: 'info', - actionUrl: '/opps/42', + payload: { + title: 'Deal Acme closed', + body: 'Congrats on Acme', + url: '/opps/42', + }, }); expect(result.output).toMatchObject({ 'notify.delivered': 2 }); }); @@ -122,7 +124,7 @@ describe('notify (baseline node)', () => { })); const result = await engine.execute('notify_flow'); expect(result.success).toBe(true); - expect(messaging.emitted[0]).toMatchObject({ title: 'Heads up', recipients: ['user_9'] }); + expect(messaging.emitted[0]).toMatchObject({ audience: ['user_9'], payload: { title: 'Heads up' } }); }); it('fails the step when title is missing', async () => { diff --git a/packages/services/service-automation/src/builtin/notify-node.ts b/packages/services/service-automation/src/builtin/notify-node.ts index 90bb02f80..085f63478 100644 --- a/packages/services/service-automation/src/builtin/notify-node.ts +++ b/packages/services/service-automation/src/builtin/notify-node.ts @@ -14,16 +14,16 @@ import { interpolate } from './template.js'; * degrades to a no-op success. */ export interface MessagingServiceSurface { - emit(notification: { - topic?: string; - title: string; - body: string; + emit(input: { + topic: string; + audience: string[]; + payload?: Record; severity?: string; - recipients: string[]; + dedupKey?: string; + source?: { object: string; id: string }; + actorId?: string; channels?: string[]; - actionUrl?: string; - payload?: Record; - }): Promise<{ delivered: number; failed: number }>; + }): Promise<{ notificationId: string; delivered: number; failed: number }>; } /** Coerce a config value (string | string[]) into a clean string[]. */ @@ -99,19 +99,24 @@ export function registerNotifyNode(engine: AutomationEngine, ctx: PluginContext) } try { + // ADR-0030 single ingress: hand the messaging service a topic + + // audience + payload; it writes the L2 event and materializes + // per channel. title/body/url ride in the payload (templates in + // a later phase fall back to these). const result = await messaging.emit({ - topic, - title, - body, + topic: topic ?? 'notify', + audience: recipients, + payload: { ...(payload ?? {}), title, body, url: actionUrl }, severity, - recipients, channels: channels.length ? channels : undefined, - actionUrl, - payload, }); return { success: true, - output: { delivered: result.delivered, failed: result.failed }, + output: { + notificationId: result.notificationId, + delivered: result.delivered, + failed: result.failed, + }, }; } catch (err) { return { success: false, error: `notify failed: ${(err as Error).message}` }; diff --git a/packages/services/service-messaging/src/channel.ts b/packages/services/service-messaging/src/channel.ts index 41489601f..d608f5494 100644 --- a/packages/services/service-messaging/src/channel.ts +++ b/packages/services/service-messaging/src/channel.ts @@ -17,8 +17,19 @@ * transport — it writes a row in our own DB — so it needs no connector. */ -/** A platform → user notification, before fan-out to channels. */ +/** + * A platform → user notification, before fan-out to channels. + * + * This is the *internal* per-recipient unit the service hands to channels. The + * public ingress is {@link EmitInput} on `MessagingService.emit`, which writes + * the L2 `sys_notification` event first and then derives one of these per + * resolved recipient. + */ export interface Notification { + /** Id of the L2 `sys_notification` event this delivery materializes. */ + readonly notificationId?: string; + /** Tenant stamp propagated to materialization rows so RLS matches the recipient. */ + readonly organizationId?: string; /** Topic id, e.g. `contract.approval_requested`. Optional in M1-minimal. */ readonly topic?: string; /** Short headline shown in the inbox / email subject / push title. */ diff --git a/packages/services/service-messaging/src/inbox-channel.test.ts b/packages/services/service-messaging/src/inbox-channel.test.ts index 44773290d..54eb2e004 100644 --- a/packages/services/service-messaging/src/inbox-channel.test.ts +++ b/packages/services/service-messaging/src/inbox-channel.test.ts @@ -1,7 +1,7 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. import { describe, it, expect } from 'vitest'; -import { createInboxChannel, INBOX_OBJECT } from './inbox-channel.js'; +import { createInboxChannel, INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js'; import type { Delivery } from './channel.js'; function silentCtx() { @@ -64,20 +64,65 @@ describe('inbox channel', () => { expect(result.ok).toBe(true); expect(result.externalId).toBe('inbox_1'); + // No notificationId on this delivery → no receipt; just the inbox row. expect(data.inserts).toHaveLength(1); expect(data.inserts[0].object).toBe(INBOX_OBJECT); expect(data.inserts[0].row).toEqual({ user_id: 'user_42', + notification_id: null, topic: 'deal.won', title: 'Deal closed', body_md: 'Acme signed 🎉', severity: 'info', action_url: '/opportunities/42', - read: false, + organization_id: null, created_at: '2026-06-01T00:00:00.000Z', }); }); + it('writes the inbox row + a delivered receipt when the event id is present', async () => { + const data = fakeData(); + const ch = createInboxChannel({ getData: () => data.engine, now: () => '2026-06-01T00:00:00.000Z' }); + + await ch.send( + silentCtx(), + delivery({ notificationId: 'evt_9', organizationId: 'org_1' }, 'user_42'), + ); + + expect(data.inserts.map((i) => i.object)).toEqual([INBOX_OBJECT, RECEIPT_OBJECT]); + expect(data.inserts[0].row).toMatchObject({ + user_id: 'user_42', + notification_id: 'evt_9', + organization_id: 'org_1', + }); + expect(data.inserts[1].row).toEqual({ + notification_id: 'evt_9', + delivery_id: null, + user_id: 'user_42', + channel: 'inbox', + state: 'delivered', + at: '2026-06-01T00:00:00.000Z', + organization_id: 'org_1', + created_at: '2026-06-01T00:00:00.000Z', + }); + }); + + it('still delivers the inbox row when the receipt write fails (best-effort)', async () => { + let calls = 0; + const ch = createInboxChannel({ + getData: () => fakeData((obj) => { + calls += 1; + if (obj === RECEIPT_OBJECT) throw new Error('receipt table locked'); + return { id: 'inbox_1' }; + }).engine, + now: () => '2026-06-01T00:00:00.000Z', + }); + const result = await ch.send(silentCtx(), delivery({ notificationId: 'evt_9' }, 'user_42')); + expect(result.ok).toBe(true); + expect(result.externalId).toBe('inbox_1'); + expect(calls).toBe(2); // inbox insert + attempted receipt insert + }); + it('defaults severity to info when the notification omits it', async () => { const data = fakeData(); const ch = createInboxChannel({ getData: () => data.engine }); diff --git a/packages/services/service-messaging/src/inbox-channel.ts b/packages/services/service-messaging/src/inbox-channel.ts index f62282abc..57c64f32a 100644 --- a/packages/services/service-messaging/src/inbox-channel.ts +++ b/packages/services/service-messaging/src/inbox-channel.ts @@ -12,6 +12,9 @@ import type { /** The object the inbox channel writes rows to. */ export const INBOX_OBJECT = 'sys_inbox_message'; +/** The receipt object the inbox channel writes a `delivered` row to (ADR-0030). */ +export const RECEIPT_OBJECT = 'sys_notification_receipt'; + /** The user identity object an email-shaped recipient is resolved against. */ export const USER_OBJECT = 'sys_user'; @@ -28,6 +31,8 @@ export interface InboxChannelOptions { getData(): IDataEngine | undefined; /** Object name override (default {@link INBOX_OBJECT}). */ objectName?: string; + /** Receipt object name override (default {@link RECEIPT_OBJECT}). */ + receiptObject?: string; /** * User identity object used to resolve an email-shaped recipient to its * id (default {@link USER_OBJECT}). The inbox is keyed by user id, but @@ -51,6 +56,7 @@ export interface InboxChannelOptions { */ export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel { const objectName = opts.objectName ?? INBOX_OBJECT; + const receiptObject = opts.receiptObject ?? RECEIPT_OBJECT; const userObject = opts.userObject ?? USER_OBJECT; const now = opts.now ?? (() => new Date().toISOString()); @@ -84,6 +90,36 @@ export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel } } + /** + * Write the `delivered` receipt for an inbox materialization. Best-effort: + * receipts are the read-state spine but a failure here must never turn a + * delivered message into a failed one — we log and move on. Skipped when the + * event id is absent (a synthetic/minimal stack with nothing to key on). + */ + async function writeDeliveredReceipt( + ctx: MessagingChannelContext, + data: IDataEngine, + r: { notificationId?: string; userId: string; organizationId?: string; at: string }, + ): Promise { + if (!r.notificationId) return; + try { + await data.insert(receiptObject, { + notification_id: r.notificationId, + delivery_id: null, + user_id: r.userId, + channel: 'inbox', + state: 'delivered', + at: r.at, + organization_id: r.organizationId ?? null, + created_at: r.at, + }); + } catch (err) { + ctx.logger.warn( + `[inbox] delivered receipt write failed for '${r.userId}' (${(err as Error).message}); inbox row stands`, + ); + } + } + return { id: 'inbox', @@ -99,25 +135,39 @@ export function createInboxChannel(opts: InboxChannelOptions): MessagingChannel } const userId = await resolveRecipient(ctx, data, delivery.recipient); + const at = now(); const row: Record = { user_id: userId, + notification_id: n.notificationId ?? null, topic: n.topic, title: n.title, body_md: n.body, severity: n.severity ?? 'info', action_url: n.actionUrl, - read: false, - created_at: now(), + organization_id: n.organizationId ?? null, + created_at: at, }; + let inboxId: string | undefined; try { const created = await data.insert(objectName, row); const id = Array.isArray(created) ? created[0]?.id : created?.id ?? created; - return { ok: true, externalId: id != null ? String(id) : undefined }; + inboxId = id != null ? String(id) : undefined; } catch (err) { return { ok: false, error: `inbox insert failed: ${(err as Error).message}` }; } + + // Read-state lives in the receipt (ADR-0030), not on the inbox row. + // Best-effort: a missing receipt must not fail a delivered message. + await writeDeliveredReceipt(ctx, data, { + notificationId: n.notificationId, + userId, + organizationId: n.organizationId, + at, + }); + + return { ok: true, externalId: inboxId }; }, classifyError(_err: unknown): ErrorClass { diff --git a/packages/services/service-messaging/src/index.ts b/packages/services/service-messaging/src/index.ts index 620fbf5dc..1b7c169bd 100644 --- a/packages/services/service-messaging/src/index.ts +++ b/packages/services/service-messaging/src/index.ts @@ -19,11 +19,18 @@ export { MessagingServicePlugin } from './messaging-service-plugin.js'; export type { MessagingServicePluginOptions } from './messaging-service-plugin.js'; // Service + types -export { MessagingService } from './messaging-service.js'; -export type { DeliveryOutcome, EmitResult } from './messaging-service.js'; +export { MessagingService, NOTIFICATION_EVENT_OBJECT } from './messaging-service.js'; +export type { + DeliveryOutcome, + EmitResult, + EmitInput, + Audience, + AudienceSpec, + MessagingServiceContext, +} from './messaging-service.js'; // Channel seam -export { createInboxChannel, INBOX_OBJECT } from './inbox-channel.js'; +export { createInboxChannel, INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js'; export type { InboxChannelOptions } from './inbox-channel.js'; export type { MessagingChannel, @@ -35,4 +42,4 @@ export type { } from './channel.js'; // Objects (metadata definitions) -export { InboxMessage } from './objects/index.js'; +export { InboxMessage, NotificationReceipt } from './objects/index.js'; diff --git a/packages/services/service-messaging/src/messaging-service-plugin.test.ts b/packages/services/service-messaging/src/messaging-service-plugin.test.ts index 1ac023df9..df77830d2 100644 --- a/packages/services/service-messaging/src/messaging-service-plugin.test.ts +++ b/packages/services/service-messaging/src/messaging-service-plugin.test.ts @@ -54,30 +54,36 @@ describe('MessagingServicePlugin', () => { expect(messaging.getRegisteredChannels()).toEqual(['inbox']); }); - it('registers the sys_inbox_message object via the manifest service', async () => { + it('registers the sys_inbox_message + sys_notification_receipt objects via the manifest service', async () => { const { ctx, registeredObjects } = fakeCtx(); await new MessagingServicePlugin().init(ctx); const names = registeredObjects.map((o: any) => o?.name); expect(names).toContain('sys_inbox_message'); + expect(names).toContain('sys_notification_receipt'); }); - it('end-to-end: emit() through the registered service writes an inbox row', async () => { + it('end-to-end: emit() writes the L2 event, the inbox row, and the receipt', async () => { const { ctx, services, inserts } = fakeCtx(); await new MessagingServicePlugin().init(ctx); const messaging = services.get('messaging') as MessagingService; const result = await messaging.emit({ topic: 'deal.won', - title: 'Deal closed', - body: 'Acme signed 🎉', - recipients: ['user_1'], + audience: ['user_1'], + payload: { title: 'Deal closed', body: 'Acme signed 🎉' }, }); expect(result.delivered).toBe(1); - expect(inserts).toHaveLength(1); - expect(inserts[0].object).toBe('sys_inbox_message'); - expect(inserts[0].row).toMatchObject({ user_id: 'user_1', title: 'Deal closed', read: false }); + const objs = inserts.map((i) => i.object); + expect(objs).toEqual(['sys_notification', 'sys_inbox_message', 'sys_notification_receipt']); + + // The event row id threads through to the materialization + receipt. + expect(result.notificationId).toBe('row_1'); + const inbox = inserts.find((i) => i.object === 'sys_inbox_message')!; + expect(inbox.row).toMatchObject({ user_id: 'user_1', title: 'Deal closed', notification_id: 'row_1' }); + const receipt = inserts.find((i) => i.object === 'sys_notification_receipt')!; + expect(receipt.row).toMatchObject({ notification_id: 'row_1', user_id: 'user_1', channel: 'inbox', state: 'delivered' }); }); it('can be constructed without the inbox channel for an empty registry', async () => { diff --git a/packages/services/service-messaging/src/messaging-service-plugin.ts b/packages/services/service-messaging/src/messaging-service-plugin.ts index 3848f2193..1208d9edb 100644 --- a/packages/services/service-messaging/src/messaging-service-plugin.ts +++ b/packages/services/service-messaging/src/messaging-service-plugin.ts @@ -4,7 +4,7 @@ import type { Plugin, PluginContext } from '@objectstack/core'; import type { IDataEngine } from '@objectstack/spec/contracts'; import { MessagingService } from './messaging-service.js'; import { createInboxChannel } from './inbox-channel.js'; -import { InboxMessage } from './objects/index.js'; +import { InboxMessage, NotificationReceipt } from './objects/index.js'; export interface MessagingServicePluginOptions { /** @@ -45,19 +45,24 @@ export class MessagingServicePlugin implements Plugin { } async init(ctx: PluginContext): Promise { - const service = new MessagingService({ logger: ctx.logger }); + // Shared lazy data-engine resolver — used both to persist the L2 + // `sys_notification` event in `emit()` and by the inbox channel to + // materialize rows. Resolved lazily so it works regardless of plugin + // init order. + const getData = (): IDataEngine | undefined => { + try { + return ( + ctx.getService('data') ?? + ctx.getService('objectql') + ); + } catch { + return undefined; + } + }; + + const service = new MessagingService({ logger: ctx.logger, getData }); if (this.options.registerInbox) { - const getData = (): IDataEngine | undefined => { - try { - return ( - ctx.getService('data') ?? - ctx.getService('objectql') - ); - } catch { - return undefined; - } - }; service.registerChannel(createInboxChannel({ getData })); } @@ -70,7 +75,7 @@ export class MessagingServicePlugin implements Plugin { version: '1.0.0', type: 'plugin', scope: 'system', - objects: [InboxMessage], + objects: [InboxMessage, NotificationReceipt], }); ctx.logger.info( diff --git a/packages/services/service-messaging/src/messaging-service.test.ts b/packages/services/service-messaging/src/messaging-service.test.ts index 7cf19627c..e03f1a284 100644 --- a/packages/services/service-messaging/src/messaging-service.test.ts +++ b/packages/services/service-messaging/src/messaging-service.test.ts @@ -26,6 +26,31 @@ function recordingChannel(id: string, result: SendResult = { ok: true }): { }; } +/** A fake data engine capturing event inserts (and optionally a dedup hit). */ +function fakeData(findOneImpl?: (obj: string, q: any) => any) { + const inserts: Array<{ object: string; row: any }> = []; + const findOnes: Array<{ object: string; query: any }> = []; + return { + inserts, + findOnes, + getData: () => ({ + async insert(object: string, row: any) { + inserts.push({ object, row }); + return { id: `evt_${inserts.length}`, ...row }; + }, + async find() { return []; }, + async findOne(object: string, query: any) { + findOnes.push({ object, query }); + return findOneImpl ? findOneImpl(object, query) : null; + }, + async update() { return {}; }, + async delete() { return {}; }, + async count() { return 0; }, + async aggregate() { return []; }, + }) as any, + }; +} + describe('MessagingService', () => { let service: MessagingService; @@ -59,24 +84,54 @@ describe('MessagingService', () => { }); }); - describe('emit() fan-out', () => { - it('defaults to the inbox channel and one delivery per recipient', async () => { + describe('emit() ingress + fan-out', () => { + it('defaults to the inbox channel and one delivery per resolved recipient', async () => { const inbox = recordingChannel('inbox', { ok: true, externalId: 'row_1' }); service.registerChannel(inbox.channel); const result = await service.emit({ - title: 'Deal closed', - body: 'Acme signed 🎉', - recipients: ['user_1', 'user_2'], + topic: 'deal.won', + audience: ['user_1', 'user_2'], + payload: { title: 'Deal closed', body: 'Acme signed 🎉' }, }); expect(inbox.seen.map((d) => d.recipient)).toEqual(['user_1', 'user_2']); expect(inbox.seen[0].channel).toBe('inbox'); + expect(inbox.seen[0].notification.title).toBe('Deal closed'); expect(result.delivered).toBe(2); expect(result.failed).toBe(0); + expect(result.notificationId).toMatch(/^evt_/); // synthesized w/o data layer expect(result.deliveries[0]).toMatchObject({ channel: 'inbox', recipient: 'user_1', ok: true, externalId: 'row_1' }); }); + it('accepts a single (non-array) audience entry', async () => { + const inbox = recordingChannel('inbox'); + service.registerChannel(inbox.channel); + const result = await service.emit({ topic: 't', audience: 'user_9', payload: { title: 'Hi' } }); + expect(inbox.seen.map((d) => d.recipient)).toEqual(['user_9']); + expect(result.delivered).toBe(1); + }); + + it('de-duplicates repeated recipients in the audience', async () => { + const inbox = recordingChannel('inbox'); + service.registerChannel(inbox.channel); + await service.emit({ topic: 't', audience: ['user_1', 'user_1'], payload: { title: 'Hi' } }); + expect(inbox.seen.map((d) => d.recipient)).toEqual(['user_1']); + }); + + it('skips deferred role:/team:/owner_of: selectors until P1 (no recipients)', async () => { + const inbox = recordingChannel('inbox'); + service.registerChannel(inbox.channel); + const result = await service.emit({ + topic: 't', + audience: ['role:admin', 'team:sales', { ownerOf: { object: 'lead', id: 'l1' } }], + payload: { title: 'Hi' }, + }); + expect(inbox.seen).toHaveLength(0); + expect(result.delivered).toBe(0); + expect(result.failed).toBe(0); + }); + it('fans out across every requested channel', async () => { const inbox = recordingChannel('inbox'); const email = recordingChannel('email'); @@ -84,10 +139,10 @@ describe('MessagingService', () => { service.registerChannel(email.channel); const result = await service.emit({ - title: 'Hi', - body: 'there', - recipients: ['user_1'], + topic: 't', + audience: ['user_1'], channels: ['inbox', 'email'], + payload: { title: 'Hi', body: 'there' }, }); expect(inbox.seen).toHaveLength(1); @@ -97,10 +152,10 @@ describe('MessagingService', () => { it('reports a failed delivery per recipient when a channel is unregistered, without throwing', async () => { const result = await service.emit({ - title: 'Hi', - body: 'there', - recipients: ['user_1', 'user_2'], + topic: 't', + audience: ['user_1', 'user_2'], channels: ['email'], + payload: { title: 'Hi' }, }); expect(result.delivered).toBe(0); expect(result.failed).toBe(2); @@ -114,16 +169,66 @@ describe('MessagingService', () => { throw new Error('boom'); }, }); - const result = await service.emit({ title: 'x', body: 'y', recipients: ['user_1'] }); + const result = await service.emit({ topic: 't', audience: ['user_1'], payload: { title: 'x' } }); expect(result.failed).toBe(1); expect(result.deliveries[0].error).toContain('boom'); }); it('surfaces a channel-reported failure (ok:false)', async () => { service.registerChannel(recordingChannel('inbox', { ok: false, error: 'quota exceeded' }).channel); - const result = await service.emit({ title: 'x', body: 'y', recipients: ['user_1'] }); + const result = await service.emit({ topic: 't', audience: ['user_1'], payload: { title: 'x' } }); expect(result.failed).toBe(1); expect(result.deliveries[0].error).toBe('quota exceeded'); }); }); + + describe('emit() L2 event persistence', () => { + it('writes one sys_notification event row carrying topic/payload/severity/source/actor', async () => { + const data = fakeData(); + service = new MessagingService({ logger: silentLogger(), getData: data.getData, now: () => '2026-06-01T00:00:00.000Z' }); + service.registerChannel(recordingChannel('inbox').channel); + + const result = await service.emit({ + topic: 'task.assigned', + audience: ['user_1'], + severity: 'warning', + source: { object: 'task', id: 't_7' }, + actorId: 'user_admin', + organizationId: 'org_1', + payload: { title: 'Assigned' }, + }); + + const event = data.inserts.find((i) => i.object === 'sys_notification'); + expect(event).toBeDefined(); + expect(event!.row).toMatchObject({ + topic: 'task.assigned', + severity: 'warning', + source_object: 'task', + source_id: 't_7', + actor_id: 'user_admin', + organization_id: 'org_1', + created_at: '2026-06-01T00:00:00.000Z', + }); + expect(result.notificationId).toBe('evt_1'); + }); + + it('is idempotent on dedupKey — a matching prior event skips fan-out', async () => { + const data = fakeData((obj) => (obj === 'sys_notification' ? { id: 'evt_existing' } : null)); + service = new MessagingService({ logger: silentLogger(), getData: data.getData }); + const inbox = recordingChannel('inbox'); + service.registerChannel(inbox.channel); + + const result = await service.emit({ + topic: 'task.assigned', + audience: ['user_1'], + dedupKey: 'task.assigned:t_7:user_1', + payload: { title: 'Assigned' }, + }); + + expect(result.deduped).toBe(true); + expect(result.notificationId).toBe('evt_existing'); + expect(inbox.seen).toHaveLength(0); // no re-fan + expect(data.inserts.some((i) => i.object === 'sys_notification')).toBe(false); + }); + }); }); diff --git a/packages/services/service-messaging/src/messaging-service.ts b/packages/services/service-messaging/src/messaging-service.ts index 92d8d7394..377c27955 100644 --- a/packages/services/service-messaging/src/messaging-service.ts +++ b/packages/services/service-messaging/src/messaging-service.ts @@ -1,11 +1,54 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. +import type { IDataEngine } from '@objectstack/spec/contracts'; import type { MessagingChannel, MessagingChannelContext, Notification, } from './channel.js'; +/** The L2 event object every `emit()` writes one row to (ADR-0030). */ +export const NOTIFICATION_EVENT_OBJECT = 'sys_notification'; + +/** + * Audience selector for {@link EmitInput}. P0 resolves explicit user ids and + * email-shaped recipients inline (email→id is finished at the inbox channel); + * `role:` / `team:` / `owner_of:` selectors are forwarded but only fully + * expanded by the `RecipientResolver` in P1. + */ +export type AudienceSpec = + | string // user id | email | 'role:x' | 'team:x' + | { ownerOf: { object: string; id: string } }; + +export type Audience = AudienceSpec | readonly AudienceSpec[]; + +/** + * The single notification ingress (ADR-0030 §3). Every producer — the flow + * `notify` node, collaboration `@mention`, record assignment, system alerts — + * calls `emit()` with this shape. No producer writes a per-user inbox row + * directly; the in-app inbox is a *materialization of delivery*. + */ +export interface EmitInput { + /** Topic id, e.g. `task.assigned`, `collab.mention`. */ + readonly topic: string; + /** Who should receive it. See {@link Audience}. */ + readonly audience: Audience; + /** Template inputs carried to channels (title/body/url/actor/source/…). */ + readonly payload?: Record; + /** Severity hint for rendering / filtering. */ + readonly severity?: 'info' | 'warning' | 'critical'; + /** Idempotency key within a topic window; a repeat `emit` is a no-op. */ + readonly dedupKey?: string; + /** The record that triggered the event. */ + readonly source?: { object: string; id: string }; + /** User who caused the event (mentioner, assigner). */ + readonly actorId?: string; + /** Tenant stamp for sudo/background paths so RLS matches recipients. */ + readonly organizationId?: string; + /** Channels to fan out to. Defaults to `['inbox']` (always on). */ + readonly channels?: string[]; +} + /** Per-delivery outcome record returned from {@link MessagingService.emit}. */ export interface DeliveryOutcome { readonly channel: string; @@ -17,30 +60,55 @@ export interface DeliveryOutcome { /** Aggregate result of fanning one notification out to its channels. */ export interface EmitResult { + /** Id of the L2 `sys_notification` event written (or matched, on dedup). */ + readonly notificationId: string; + /** True when `dedupKey` matched an existing event and fan-out was skipped. */ + readonly deduped: boolean; readonly deliveries: DeliveryOutcome[]; readonly delivered: number; readonly failed: number; } +/** Context the service needs: a logger, plus data access for the L2 event. */ +export interface MessagingServiceContext extends MessagingChannelContext { + /** + * Resolve the runtime data engine used to persist the L2 event. Returns + * `undefined` on a minimal/test stack with no data layer — `emit()` then + * uses a synthetic id and warns rather than throwing, matching the + * platform's CRUD-node degradation. + */ + getData?(): IDataEngine | undefined; + /** Clock injection for deterministic tests. Defaults to `new Date()`. */ + now?(): string; +} + +/** Selector prefixes the inline resolver forwards verbatim (expanded in P1). */ +const DEFERRED_SELECTOR = /^(role:|team:|owner_of:)/; + /** - * MessagingService — the M1-minimal outbound dispatcher (ADR-0012). + * MessagingService — the notification dispatcher (ADR-0012 / ADR-0030). * - * Holds the `MessagingChannel` registry and implements `emit()`: it fans a - * notification out to `(channel × recipient)` deliveries and calls each - * channel's `send()`. The always-on `inbox` channel is registered by the - * plugin; other channels (email/webhook/push/IM) register themselves. + * Holds the `MessagingChannel` registry and implements the single ingress + * `emit(EmitInput)`: + * 1. write the L2 `sys_notification` event (idempotent on `dedupKey`); + * 2. resolve the audience to recipient ids (inline in P0; `RecipientResolver` + * owns role/team/owner_of expansion in P1); + * 3. fan `(channel × recipient)` deliveries and call each channel's `send()`, + * which materializes the artifact (inbox row + receipt, email, …). * - * Deliberately *not* in this milestone (see ADR-0012 §M1 vs the deferred - * scope): the durable outbox, retry schedule, cluster-lock, dead-letter, the - * topic catalog, the per-user preference matrix, renderers, and middleware. - * `emit()` is synchronous best-effort fan-out; failures are reported in the - * result rather than persisted for retry. The seam (`MessagingChannel`, - * `Notification`) is shaped so those land without breaking callers. + * Deliberately *not* in this phase (ADR-0030 P1+): the durable outbox, retry + * schedule, cluster-lock, dead-letter, the per-user preference matrix, + * renderers/templates, and digest middleware. `emit()` is best-effort fan-out; + * failures are reported in the result. The seams are shaped so those land + * without breaking callers. */ export class MessagingService { private readonly channels = new Map(); + private readonly now: () => string; - constructor(private readonly ctx: MessagingChannelContext) {} + constructor(private readonly ctx: MessagingServiceContext) { + this.now = ctx.now ?? (() => new Date().toISOString()); + } /** Register a channel implementation. A duplicate id warns and replaces. */ registerChannel(channel: MessagingChannel): void { @@ -66,15 +134,137 @@ export class MessagingService { return [...this.channels.keys()]; } + /** + * The single notification ingress. Writes the L2 event, resolves the + * audience, and fans the result out to its channels. An unregistered + * channel, or a channel that throws, is reported as a failed delivery — it + * never aborts the rest of the fan-out. A `dedupKey` that matches an + * existing event short-circuits: the event id is returned and no new + * deliveries are produced. + */ + async emit(input: EmitInput): Promise { + const data = this.ctx.getData?.(); + + // 1) Idempotency — a prior event with the same dedup_key is a no-op. + if (input.dedupKey && data) { + const existing = await this.findEventByDedupKey(data, input.dedupKey); + if (existing) { + this.ctx.logger.info( + `[messaging] emit: dedupKey '${input.dedupKey}' already emitted (${existing}); skipping`, + ); + return { notificationId: existing, deduped: true, deliveries: [], delivered: 0, failed: 0 }; + } + } + + // 2) Write the L2 event (or synthesize an id when there is no data layer). + const notificationId = await this.writeEvent(data, input); + + // 3) Resolve the audience to recipients. + const recipients = this.resolveRecipients(input.audience); + if (recipients.length === 0) { + this.ctx.logger.warn(`[messaging] emit: topic '${input.topic}' resolved to 0 recipients`); + return { notificationId, deduped: false, deliveries: [], delivered: 0, failed: 0 }; + } + + // 4) Derive the per-recipient notification and fan out to channels. + const payload = input.payload ?? {}; + const notification: Notification = { + notificationId, + organizationId: input.organizationId, + topic: input.topic, + title: str(payload.title) ?? input.topic, + body: str(payload.body) ?? '', + severity: input.severity ?? 'info', + recipients, + channels: input.channels, + actionUrl: str(payload.url) ?? str(payload.actionUrl), + payload: input.payload, + }; + + const { deliveries, delivered, failed } = await this.fanOut(notification, recipients); + return { notificationId, deduped: false, deliveries, delivered, failed }; + } + + /** Find an existing event id by its dedup key, tolerating lookup failure. */ + private async findEventByDedupKey(data: IDataEngine, dedupKey: string): Promise { + try { + const row = await data.findOne(NOTIFICATION_EVENT_OBJECT, { + where: { dedup_key: dedupKey }, + fields: ['id'], + }); + const id = row?.id; + return id != null && String(id).length > 0 ? String(id) : undefined; + } catch (err) { + this.ctx.logger.warn(`[messaging] dedup lookup failed (${(err as Error).message}); proceeding`); + return undefined; + } + } + + /** + * Persist the L2 event and return its id. With no data layer (minimal/test + * stacks) we warn and synthesize an id so fan-out can still be exercised. + */ + private async writeEvent(data: IDataEngine | undefined, input: EmitInput): Promise { + if (!data) { + this.ctx.logger.warn('[messaging] no data engine registered; event not persisted'); + return `evt_${Math.random().toString(36).slice(2)}`; + } + const row: Record = { + topic: input.topic, + payload: input.payload ?? null, + severity: input.severity ?? 'info', + dedup_key: input.dedupKey ?? null, + // Normalize empty strings to null so the (source_object, source_id) + // index keys on real ids, never '' (producers may pass a bare object + // with no id — e.g. a comment thread_id with no record part). + source_object: str(input.source?.object) ?? null, + source_id: str(input.source?.id) ?? null, + actor_id: input.actorId ?? null, + organization_id: input.organizationId ?? null, + created_at: this.now(), + }; + const created = await data.insert(NOTIFICATION_EVENT_OBJECT, row); + const id = Array.isArray(created) ? created[0]?.id : created?.id ?? created; + return id != null ? String(id) : `evt_${Math.random().toString(36).slice(2)}`; + } + + /** + * Expand an audience to a flat recipient list. P0 keeps explicit ids and + * email-shaped entries (email→id finishes at the inbox channel); deferred + * `role:`/`team:`/`owner_of:` selectors warn and are dropped until the P1 + * `RecipientResolver` lands. + */ + private resolveRecipients(audience: Audience): string[] { + const specs = Array.isArray(audience) ? audience : [audience as AudienceSpec]; + const out: string[] = []; + for (const spec of specs) { + if (typeof spec !== 'string') { + this.ctx.logger.warn( + `[messaging] owner_of: audience resolution lands in P1; '${JSON.stringify(spec)}' skipped`, + ); + continue; + } + if (DEFERRED_SELECTOR.test(spec)) { + this.ctx.logger.warn(`[messaging] audience selector '${spec}' resolution lands in P1; skipped`); + continue; + } + if (spec.trim()) out.push(spec.trim()); + } + // De-dup while preserving order. + return [...new Set(out)]; + } + /** * Fan a notification out to its channels and recipients. Each * `(channel, recipient)` pair becomes one `send()` call. An unregistered - * channel, or a channel that throws, is reported as a failed delivery — - * it never aborts the rest of the fan-out. + * channel, or a channel that throws, is reported as a failed delivery — it + * never aborts the rest of the fan-out. */ - async emit(notification: Notification): Promise { + private async fanOut( + notification: Notification, + recipients: string[], + ): Promise<{ deliveries: DeliveryOutcome[]; delivered: number; failed: number }> { const channels = notification.channels?.length ? notification.channels : ['inbox']; - const recipients = notification.recipients ?? []; const deliveries: DeliveryOutcome[] = []; for (const channelId of channels) { @@ -122,3 +312,10 @@ export class MessagingService { return { deliveries, delivered, failed: deliveries.length - delivered }; } } + +/** Coerce a payload value to a non-empty string, else `undefined`. */ +function str(v: unknown): string | undefined { + if (v == null) return undefined; + const s = String(v); + return s.length > 0 ? s : undefined; +} diff --git a/packages/services/service-messaging/src/objects/inbox-message.object.ts b/packages/services/service-messaging/src/objects/inbox-message.object.ts index bf65ccdb5..4946f3ff0 100644 --- a/packages/services/service-messaging/src/objects/inbox-message.object.ts +++ b/packages/services/service-messaging/src/objects/inbox-message.object.ts @@ -3,21 +3,42 @@ import { ObjectSchema, Field } from '@objectstack/spec/data'; /** - * `sys_inbox_message` — user-facing in-app notification rows (ADR-0012 §4, §11). + * `sys_inbox_message` — user-facing in-app notification rows (ADR-0012 §4, §11; + * ADR-0030 Layer 5). * * Written by the always-on `inbox` messaging channel, one row per - * `(notification, recipient)`. The client pulls these for the in-app inbox; + * `(notification, recipient)`, as the **materialization** of an L2 + * `sys_notification` event delivered to the in-app channel. The Console bell + * pulls these (ADR-0030 re-points the bell here from `sys_notification`); * `service-realtime` decides when to ping an online user. Belongs to * `service-messaging` per the "protocol + service ownership" pattern. + * + * Read-state is **not** stored here — it lives in `sys_notification_receipt` + * (ADR-0030), so cross-channel read semantics stay reachable. `notification_id` + * links back to the event; `delivery_id` links to the outbox row (P1). */ export const InboxMessage = ObjectSchema.create({ name: 'sys_inbox_message', label: 'Inbox Message', pluralLabel: 'Inbox Messages', icon: 'inbox', - description: 'User-facing in-app notification rows written by the inbox messaging channel.', + description: 'User-facing in-app notification rows materialized by the inbox messaging channel.', titleFormat: '{title}', - compactLayout: ['title', 'user_id', 'severity', 'read', 'created_at'], + compactLayout: ['title', 'user_id', 'severity', 'created_at'], + + listViews: { + mine: { + type: 'grid', + name: 'mine', + label: 'Notifications', + data: { provider: 'object', object: 'sys_inbox_message' }, + columns: ['title', 'topic', 'severity', 'created_at'], + filter: [{ field: 'user_id', operator: 'equals', value: '{current_user_id}' }], + sort: [{ field: 'created_at', order: 'desc' }], + pagination: { pageSize: 50 }, + emptyState: { title: 'Inbox zero', message: 'No notifications.' }, + }, + }, fields: { id: Field.text({ @@ -32,6 +53,17 @@ export const InboxMessage = ObjectSchema.create({ searchable: true, }), + notification_id: Field.text({ + label: 'Notification Event', + searchable: true, + description: 'FK → sys_notification (the L2 event this row materializes)', + }), + + delivery_id: Field.text({ + label: 'Delivery', + description: 'FK → sys_notification_delivery (outbox row); null until P1', + }), + topic: Field.text({ label: 'Topic', searchable: true, @@ -59,10 +91,6 @@ export const InboxMessage = ObjectSchema.create({ label: 'Action URL', }), - read: Field.boolean({ - label: 'Read', - }), - created_at: Field.datetime({ label: 'Created At', readonly: true, diff --git a/packages/services/service-messaging/src/objects/index.ts b/packages/services/service-messaging/src/objects/index.ts index 7e726d57b..b4d8882cc 100644 --- a/packages/services/service-messaging/src/objects/index.ts +++ b/packages/services/service-messaging/src/objects/index.ts @@ -1,3 +1,4 @@ // Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. export { InboxMessage } from './inbox-message.object.js'; +export { NotificationReceipt } from './notification-receipt.object.js'; diff --git a/packages/services/service-messaging/src/objects/notification-receipt.object.ts b/packages/services/service-messaging/src/objects/notification-receipt.object.ts new file mode 100644 index 000000000..2705dd92d --- /dev/null +++ b/packages/services/service-messaging/src/objects/notification-receipt.object.ts @@ -0,0 +1,87 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { ObjectSchema, Field } from '@objectstack/spec/data'; + +/** + * `sys_notification_receipt` — per-recipient × channel delivery receipt + * (ADR-0030 Layer 5). + * + * The single source of truth for **read-state**. When a channel materializes a + * delivery it writes a `delivered` receipt; the UI flips it to `read` / + * `clicked` / `dismissed`. Keeping read-state here (rather than on + * `sys_inbox_message`) makes cross-channel semantics reachable later — e.g. + * "clicked the email → mark the inbox row read" is a receipt update, not a + * second source of truth. + * + * Keyed by `(notification_id, user_id, channel)`. `delivery_id` links to the + * `sys_notification_delivery` outbox row once that lands (P1); it is nullable + * in P0 where the inbox channel dispatches inline. + * + * Belongs to `service-messaging` (the owner of the materialization channels). + */ +export const NotificationReceipt = ObjectSchema.create({ + name: 'sys_notification_receipt', + label: 'Notification Receipt', + pluralLabel: 'Notification Receipts', + icon: 'check-check', + isSystem: true, + managedBy: 'system', + description: 'Per-recipient × channel receipt; the source of truth for notification read-state.', + titleFormat: '{state}', + compactLayout: ['notification_id', 'user_id', 'channel', 'state', 'at'], + + fields: { + id: Field.text({ + label: 'Receipt ID', + required: true, + readonly: true, + }), + + notification_id: Field.text({ + label: 'Notification Event', + required: true, + searchable: true, + description: 'FK → sys_notification (L2 event)', + }), + + delivery_id: Field.text({ + label: 'Delivery', + required: false, + description: 'FK → sys_notification_delivery (outbox row); null until P1', + }), + + user_id: Field.text({ + label: 'Recipient User', + required: true, + searchable: true, + }), + + channel: Field.text({ + label: 'Channel', + required: true, + description: 'Channel id this receipt is for (inbox / email / push / …)', + }), + + state: Field.select(['delivered', 'read', 'clicked', 'dismissed'], { + label: 'State', + required: true, + defaultValue: 'delivered', + }), + + at: Field.datetime({ + label: 'At', + required: false, + description: 'When the receipt reached its current state', + }), + + created_at: Field.datetime({ + label: 'Created At', + readonly: true, + }), + }, + + indexes: [ + { fields: ['notification_id', 'user_id', 'channel'], unique: true }, + { fields: ['user_id', 'state'] }, + ], +});