Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .changeset/adr-0030-notification-p3a-email.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
"@objectstack/service-messaging": minor
---

ADR-0030 P3a — email channel + notification templates. The same `emit()` now
reaches inbox **and** email per the user's preferences, rendered from a
template.

- **`email` channel** (`createEmailChannel`) — a thin `MessagingChannel` that
delegates transport to the existing `email` service (ADR-0022: channel adds
messaging semantics, the email sub-system stays the transport). It resolves the
recipient user id → address (`sys_user.email`, or an email-shaped recipient
verbatim), renders, and sends. Retry/backoff/dead-letter come free from the P1
outbox dispatcher. Registered at `kernel:ready` only when an `email` service is
present; absent ⇒ no channel (an explicit `channels:['email']` then reports
"not registered" rather than silently no-opping). No-ops gracefully like the
inbox channel when the capability isn't installed.
- **`sys_notification_template`** (topic × channel × locale) + a renderer:
declarative `{{ payload.x }}` interpolation (no logic — auditable metadata),
HTML/markdown/text bodies, locale fallback (`en-US` → `en` → default), and a
**generic fallback to `payload.title`/`body`** when no template matches (so
templates are purely additive). Contributed to the Setup → Configuration nav.
- Channels are now keyed per recipient (from P2), so a notification reaches each
user on exactly the channels they accept, rendered by that channel's template.

Scope note (ADR-0022): **Slack stays a connector** (`connector-slack` already
ships the raw API path); a Slack *notification channel* needs per-user identity
mapping + OAuth and is enterprise-tier — deferred. push/webhook channels and the
digest / quiet-hours middleware (P3b) are follow-ups on the same seam.

Tests: service-messaging **85 passing** — adds `template-renderer.test.ts` and
`email-channel.test.ts` (address resolution, template vs fallback rendering,
no-service no-op, unresolved-address failure, transport-failure retry).
24 changes: 21 additions & 3 deletions docs/handoff/adr-0030-notification-convergence.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ flipped — the inbox is being populated the whole time.)
- *Follow-ups*: subscription-driven fan-out (expand a topic's subscribers when
a producer emits without an explicit audience) is schema-only so far;
`digest`/`quiet_hours` fields exist but the batching middleware is P3.
- **P3 — Channels + templates + digest**: email/push/webhook/Slack channels on
connectors (ADR-0022); `sys_notification_template` (topic×channel×locale) +
renderer; digest / quiet-hours middleware.
- **P3 — Channels + templates + digest**: split into slices.
- **P3a — email channel + templates**: ✅ shipped. `createEmailChannel`
(delegates transport to the `email` service per ADR-0022) +
`sys_notification_template` (topic×channel×locale) + `{{ payload.x }}`
renderer with generic `payload.title`/`body` fallback. Same `emit` now
reaches inbox + email per prefs.
- **P3b — digest + quiet-hours**: pending. Plan: express both as **deferring
the delivery row's `next_attempt_at`** in the P1 outbox (digest = enqueue to
the next window + collapse same-`(user, channel, window)` rows; quiet-hours =
push `next_attempt_at` to the window end), reusing the dispatcher's
claim/retry/observability — one delivery spine, not a parallel scheduler.
Consumes the `digest`/`quiet_hours` fields P2 added. critical/mandatory
bypass. tz from `quiet_hours.tz` → `sys_user` → org/UTC.
- **Deferred (same seam, incremental)**: **Slack** stays a *connector*
(`connector-slack` ships the raw API path today); a Slack notification
*channel* needs identity mapping (`sys_channel_user_link`) + OAuth and is
enterprise-tier (ADR-0022). **push** needs `sys_user_device` + APNs/FCM.
**webhook** should reuse the existing `plugin-webhooks` outbox rather than a
redundant channel. **MJML** compilation for email (P3a treats `mjml` format
as raw HTML). **`defineTopic()`** declarative topic catalog (Studio
discoverability for topics/templates/preferences).
135 changes: 135 additions & 0 deletions packages/services/service-messaging/src/email-channel.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect } from 'vitest';
import { createEmailChannel } from './email-channel.js';
import { NotificationTemplateStore } from './template-renderer.js';
import type { Delivery } from './channel.js';

function silentCtx() {
return { logger: { info: () => {}, warn: () => {}, error: () => {} } };
}

function delivery(over: Partial<Delivery['notification']> = {}, recipient = 'user_1'): Delivery {
return {
channel: 'email',
recipient,
notification: {
notificationId: 'evt_1',
topic: 'deal.won',
title: 'Deal closed',
body: 'Acme signed',
severity: 'info',
recipients: [recipient],
payload: { title: 'Deal closed', body: 'Acme signed' },
...over,
},
};
}

/** Fake data engine: user id → email, and template lookups. */
function fakeData(opts: { users?: Record<string, string>; templates?: any[] } = {}) {
const users = opts.users ?? { user_1: 'ada@example.com' };
const templates = opts.templates ?? [];
return {
async findOne(object: string, query: any) {
const w = query?.where ?? {};
if (object === 'sys_user') {
const email = users[String(w.id)];
return email ? { email } : null;
}
if (object === 'sys_notification_template') {
return templates.find((t) => t.topic === w.topic && t.channel === w.channel && t.locale === w.locale && t.is_active) ?? null;
}
return null;
},
async find() { return []; },
async insert(_o: string, r: any) { return { id: 'x', ...r }; },
async update() { return {}; },
async delete() { return {}; },
async count() { return 0; },
async aggregate() { return []; },
} as any;
}

function fakeEmail() {
const sent: any[] = [];
return {
sent,
service: {
async send(input: any) {
sent.push(input);
return { id: 'email_row_1' };
},
},
};
}

function channel(getEmail: () => any, data: any, templates: any[] = []) {
const store = new NotificationTemplateStore({ getData: () => data });
return createEmailChannel({ getEmail, getData: () => data, store: store });
}

describe('email channel', () => {
it('has the stable id "email"', () => {
const ch = channel(() => undefined, fakeData());
expect(ch.id).toBe('email');
});

it('no-ops (success) when no email service is registered', async () => {
const ch = channel(() => undefined, fakeData());
const r = await ch.send(silentCtx(), delivery());
expect(r.ok).toBe(true);
expect(r.externalId).toBeUndefined();
});

it('resolves the recipient user id → email and sends the fallback subject/body', async () => {
const email = fakeEmail();
const ch = channel(() => email.service, fakeData({ users: { user_1: 'ada@example.com' } }));
const r = await ch.send(silentCtx(), delivery());
expect(r.ok).toBe(true);
expect(r.externalId).toBe('email_row_1');
expect(email.sent).toHaveLength(1);
expect(email.sent[0]).toEqual({ to: 'ada@example.com', subject: 'Deal closed', text: 'Acme signed' });
});

it('renders an HTML template when one exists for (topic, email, locale)', async () => {
const email = fakeEmail();
const data = fakeData({
users: { user_1: 'ada@example.com' },
templates: [{ topic: 'deal.won', channel: 'email', locale: 'en', is_active: true, subject: 'Won {{ payload.title }}', body: '<h1>{{ payload.title }}</h1>', format: 'html' }],
});
const ch = channel(() => email.service, data);
await ch.send(silentCtx(), delivery());
expect(email.sent[0]).toEqual({ to: 'ada@example.com', subject: 'Won Deal closed', html: '<h1>Deal closed</h1>' });
});

it('accepts an email-shaped recipient verbatim (no user lookup)', async () => {
const email = fakeEmail();
const ch = channel(() => email.service, fakeData({ users: {} }));
const r = await ch.send(silentCtx(), delivery({}, 'bob@example.com'));
expect(r.ok).toBe(true);
expect(email.sent[0].to).toBe('bob@example.com');
});

it('reports a failure when no address resolves (observable on the delivery row)', async () => {
const email = fakeEmail();
const ch = channel(() => email.service, fakeData({ users: {} }));
const r = await ch.send(silentCtx(), delivery({}, 'ghost'));
expect(r.ok).toBe(false);
expect(r.error).toMatch(/no email address/);
expect(email.sent).toHaveLength(0);
});

it('surfaces a transport failure as ok:false (dispatcher will retry)', async () => {
const data = fakeData();
const ch = createEmailChannel({
getEmail: () => ({ async send() { throw new Error('smtp down'); } }),
getData: () => data,
store: new NotificationTemplateStore({ getData: () => data }),
});
const r = await ch.send(silentCtx(), delivery());
expect(r.ok).toBe(false);
expect(r.error).toContain('smtp down');
expect(ch.classifyError?.(new Error('x'))).toBe('retryable');
});
});
138 changes: 138 additions & 0 deletions packages/services/service-messaging/src/email-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { IDataEngine } from '@objectstack/spec/contracts';
import type {
Delivery,
ErrorClass,
MessagingChannel,
MessagingChannelContext,
SendResult,
} from './channel.js';
import {
NotificationTemplateStore,
renderNotification,
DEFAULT_LOCALE,
} from './template-renderer.js';

/** The user identity object a recipient id is resolved to an address against. */
export const USER_OBJECT = 'sys_user';

/**
* Structural view of the email service (`@objectstack/plugin-email`'s
* `EmailService`), declared locally so service-messaging takes no runtime
* dependency on it — the channel resolves whatever is registered under the
* `email` service and sends through this shape (mirrors the `notify` node's
* `MessagingServiceSurface` pattern).
*/
export interface EmailSenderSurface {
send(input: {
to: string | string[];
subject: string;
html?: string;
text?: string;
}): Promise<{ id?: string } | unknown>;
}

export interface EmailChannelOptions {
/** Resolve the email service; `undefined` ⇒ the channel no-ops (not installed). */
getEmail(): EmailSenderSurface | undefined;
/** Resolve the data engine (recipient address lookup). */
getData(): IDataEngine | undefined;
/** Template store for `(topic, 'email', locale)` rendering. */
store: NotificationTemplateStore;
/** User identity object override (default {@link USER_OBJECT}). */
userObject?: string;
/** Locale used when the delivery carries none (default {@link DEFAULT_LOCALE}). */
defaultLocale?: string;
}

const EMAIL_SHAPE = (s: string): boolean => {
// Linear, non-backtracking "looks like an email" — same shape as the
// recipient resolver's check (avoids the ReDoS-prone regex).
if (!s || /\s/.test(s)) return false;
const at = s.indexOf('@');
if (at <= 0 || at !== s.lastIndexOf('@') || at === s.length - 1) return false;
const dot = s.slice(at + 1).indexOf('.');
return dot > 0 && dot < s.length - at - 2;
};

/**
* The `email` channel (ADR-0030 P3) — delivers a notification by email.
*
* It adds only the messaging semantics on top of the existing email transport
* (ADR-0022 "channel delegates transport to a sub-system"): resolve the
* recipient's address, render `(topic, 'email', locale)` from
* `sys_notification_template` (fallback to `payload.title`/`body`), and hand the
* subject/body to the `email` service. Retry/backoff/dead-letter come for free
* from the P1 outbox dispatcher.
*
* Degrades like the inbox channel: no email service ⇒ logged no-op success
* (capability not installed); a recipient with no resolvable address ⇒ a
* reported failure (so the delivery row shows why).
*/
export function createEmailChannel(opts: EmailChannelOptions): MessagingChannel {
const userObject = opts.userObject ?? USER_OBJECT;
const defaultLocale = opts.defaultLocale ?? DEFAULT_LOCALE;

async function resolveAddress(
ctx: MessagingChannelContext,
data: IDataEngine | undefined,
recipient: string,
): Promise<string | undefined> {
if (EMAIL_SHAPE(recipient)) return recipient; // already an address
if (!data) return undefined;
try {
const user = await data.findOne(userObject, { where: { id: recipient }, fields: ['email'] });
const email = user?.email;
return typeof email === 'string' && EMAIL_SHAPE(email) ? email : undefined;
} catch (err) {
ctx.logger.warn(`[email] address lookup for '${recipient}' failed (${(err as Error).message})`);
return undefined;
}
}

return {
id: 'email',

async send(ctx: MessagingChannelContext, delivery: Delivery): Promise<SendResult> {
const email = opts.getEmail();
if (!email) {
ctx.logger.warn(`[email] no email service registered; '${delivery.recipient}' not emailed`);
return { ok: true }; // capability not installed — no-op, like inbox w/o data
}

const n = delivery.notification;
const address = await resolveAddress(ctx, opts.getData(), delivery.recipient);
if (!address) {
return { ok: false, error: `no email address for recipient '${delivery.recipient}'` };
}

const payload = (n.payload ?? {}) as Record<string, unknown>;
const locale = typeof payload.locale === 'string' ? payload.locale : defaultLocale;
const template = await opts.store.load(n.topic ?? '', 'email', locale);
const rendered = renderNotification(template, {
topic: n.topic ?? '',
payload,
title: n.title,
body: n.body,
});

try {
const result: any = await email.send({
to: address,
subject: rendered.subject,
...(rendered.html !== undefined ? { html: rendered.html } : {}),
...(rendered.text !== undefined ? { text: rendered.text } : {}),
});
const id = result?.id;
return { ok: true, externalId: id != null ? String(id) : undefined };
} catch (err) {
return { ok: false, error: `email send failed: ${(err as Error).message}` };
}
},

classifyError(_err: unknown): ErrorClass {
return 'retryable';
},
};
}
18 changes: 18 additions & 0 deletions packages/services/service-messaging/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ export type {
// Channel seam
export { createInboxChannel, INBOX_OBJECT, RECEIPT_OBJECT } from './inbox-channel.js';
export type { InboxChannelOptions } from './inbox-channel.js';
export { createEmailChannel, USER_OBJECT as EMAIL_USER_OBJECT } from './email-channel.js';
export type { EmailChannelOptions, EmailSenderSurface } from './email-channel.js';

// Templates + renderer (ADR-0030 P3)
export {
NotificationTemplateStore,
renderNotification,
interpolate,
TEMPLATE_OBJECT,
DEFAULT_LOCALE,
} from './template-renderer.js';
export type {
NotificationTemplateRow,
RenderedNotification,
RenderInput,
NotificationTemplateStoreOptions,
} from './template-renderer.js';
export type {
MessagingChannel,
MessagingChannelContext,
Expand Down Expand Up @@ -92,4 +109,5 @@ export {
NotificationDelivery,
NotificationPreference,
NotificationSubscription,
NotificationTemplate,
} from './objects/index.js';
Loading