Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .changeset/adr-0030-p1-reliable-delivery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
"@objectstack/service-messaging": minor
---

ADR-0030 P1 — reliable delivery + RecipientResolver.

**RecipientResolver** — the single home for audience → user-id expansion, wired
into `MessagingService.emit()`. Queries the same identity/membership model
`plugin-sharing` uses (directly via the data engine, no backward plugin
dependency):
- `role:<name>` → `sys_member` rows (tenant-scoped)
- `team:<id>` → `sys_team_member` rows
- `owner_of:<obj>:<id>` / `{ ownerOf }` → the record's owner/assignee field
- `<email>` → `sys_user` (verbatim fallback on miss); `user:<id>` / bare id → id

Best-effort: a failed directory lookup yields 0 recipients for that spec rather
than throwing. The inbox channel's email→id fallback moved here — the channel
now keys rows by the already-resolved recipient.

**Reliable delivery outbox + dispatcher** (mirrors `plugin-webhooks`):
- New `sys_notification_delivery` outbox object (L4) — one row per
`(event × recipient × channel)`; `pending|in_flight|success|failed|dead|suppressed`
state machine; unique `(notification_id, recipient_id, channel)` enqueue dedup.
- `INotificationOutbox` with `SqlNotificationOutbox` + `MemoryNotificationOutbox`
backends; atomic claim (`pending → in_flight`) + stale-in_flight reaping.
- `NotificationDispatcher` — interval loop over partitions, each guarded by a
per-partition cluster lock (single-node always-grant fallback when no cluster
service); sends via the channel and acks with exponential backoff + jitter;
dead-letters once the retry budget is exhausted.
- `emit()` enqueues `pending` deliveries when an outbox is attached; otherwise it
fans out inline (the P0 behavior). `MessagingServicePlugin` wires the outbox +
dispatcher at `kernel:ready` and registers the new object.

A failed channel send now retries and is observable on the delivery row;
duplicate enqueue is idempotent. Backoff/classification and clocks are injectable
for deterministic tests.
67 changes: 67 additions & 0 deletions packages/services/service-messaging/src/backoff.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { AckResult } from './outbox.js';
import type { SendResult, ErrorClass } from './channel.js';

/**
* Stable, framework-free partition hash (32-bit FNV-1a). Both the dispatcher
* and the outbox `claim()` filter on it, so it must be a single shared helper.
* Same implementation as `plugin-webhooks`.
*/
export function hashPartition(key: string, count: number): number {
if (count <= 0) throw new Error('partition count must be > 0');
let h = 0x811c9dc5;
for (let i = 0; i < key.length; i++) {
h ^= key.charCodeAt(i);
h = Math.imul(h, 0x01000193);
}
return Math.abs(h | 0) % count;
}

/**
* Exponential retry schedule with jitter. Returns the delay (ms) before the
* next attempt given how many attempts have already happened, or `null` once
* the budget is exhausted (→ dead).
*
* attempt 1 fails → ~1s · 2 → ~10s · 3 → ~1m · 4 → ~10m · 5 → ~1h · 6+ → dead
*/
export function nextRetryDelayMs(attemptsSoFar: number, rng: () => number = Math.random): number | null {
const SCHEDULE = [1_000, 10_000, 60_000, 600_000, 3_600_000];
if (attemptsSoFar < 1 || attemptsSoFar > SCHEDULE.length) return null;
const base = SCHEDULE[attemptsSoFar - 1];
const jitter = 0.8 + rng() * 0.4; // ∈ [0.8, 1.2)
return Math.floor(base * jitter);
}

/**
* Turn a channel `send()` outcome into an {@link AckResult}, applying the retry
* schedule on retriable failures.
*
* - `ok` → success.
* - `errorClass` of `permanent` → dead immediately (no point retrying).
* - `errorClass` of `invalid_recipient` → suppressed (not our transport's fault).
* - otherwise (retryable / unknown) → schedule a retry, or dead once the budget
* is exhausted.
*/
export function classifyDeliveryAttempt(
result: SendResult,
errorClass: ErrorClass | undefined,
attemptsSoFar: number,
now: number = Date.now(),
rng?: () => number,
): AckResult {
if (result.ok) return { success: true };

if (errorClass === 'invalid_recipient') {
return { success: false, error: result.error, suppressed: true };
}
if (errorClass === 'permanent') {
return { success: false, error: result.error, dead: true };
}

const delay = nextRetryDelayMs(attemptsSoFar + 1, rng);
if (delay === null) {
return { success: false, error: result.error, dead: true };
}
return { success: false, error: result.error, nextAttemptAt: now + delay };
}
157 changes: 157 additions & 0 deletions packages/services/service-messaging/src/dispatcher.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect } from 'vitest';
import { MemoryNotificationOutbox } from './memory-outbox.js';
import { NotificationDispatcher } from './dispatcher.js';
import { classifyDeliveryAttempt, nextRetryDelayMs } from './backoff.js';
import type { MessagingChannel, SendResult } from './channel.js';

function silentCtx() {
return { logger: { info: () => {}, warn: () => {}, error: () => {} } };
}

/** A channel whose send() outcome is scripted per call. */
function scriptedChannel(id: string, results: SendResult[]): { channel: MessagingChannel; calls: number } {
const state = { calls: 0 };
const channel: MessagingChannel = {
id,
async send() {
const r = results[Math.min(state.calls, results.length - 1)];
state.calls += 1;
return r;
},
classifyError: () => 'retryable',
};
return { channel, get calls() { return state.calls; } } as any;
}

function dispatcher(
outbox: MemoryNotificationOutbox,
channels: MessagingChannel[],
rng = () => 0.5,
now?: () => number,
) {
const registry = {
getChannel: (cid: string) => channels.find((c) => c.id === cid),
};
return new NotificationDispatcher({
nodeId: 'node-test',
outbox,
channels: registry,
channelContext: silentCtx(),
rng,
now,
intervalMs: 10_000, // we drive ticks manually
});
}

describe('nextRetryDelayMs', () => {
it('follows the schedule and dead-letters after the budget', () => {
expect(nextRetryDelayMs(1, () => 0)).toBe(800); // 1000 * 0.8
expect(nextRetryDelayMs(5, () => 0)).toBe(2_880_000); // 3_600_000 * 0.8
expect(nextRetryDelayMs(6)).toBeNull(); // exhausted
expect(nextRetryDelayMs(0)).toBeNull();
});
});

describe('classifyDeliveryAttempt', () => {
it('success short-circuits', () => {
expect(classifyDeliveryAttempt({ ok: true }, undefined, 0)).toEqual({ success: true });
});
it('permanent → dead, invalid_recipient → suppressed', () => {
expect(classifyDeliveryAttempt({ ok: false, error: 'x' }, 'permanent', 0)).toMatchObject({ dead: true });
expect(classifyDeliveryAttempt({ ok: false, error: 'x' }, 'invalid_recipient', 0)).toMatchObject({ suppressed: true });
});
it('retryable schedules nextAttemptAt until the budget is exhausted', () => {
const r = classifyDeliveryAttempt({ ok: false, error: 'x' }, 'retryable', 0, 1000, () => 0);
expect(r).toMatchObject({ success: false, nextAttemptAt: 1800 });
const dead = classifyDeliveryAttempt({ ok: false, error: 'x' }, 'retryable', 5, 1000, () => 0);
expect(dead).toMatchObject({ dead: true });
});
});

describe('NotificationDispatcher', () => {
it('delivers a pending row through its channel and marks it success', async () => {
const outbox = new MemoryNotificationOutbox(1);
const { channel } = scriptedChannel('inbox', [{ ok: true, externalId: 'inbox_1' }]);
const seen: any[] = [];
const sendCh: MessagingChannel = {
id: 'inbox',
async send(_ctx, d) { seen.push(d); return { ok: true }; },
};
await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: { title: 'Hi', body: 'there' } });

const d = dispatcher(outbox, [sendCh]);
await d.tick();

expect(seen).toHaveLength(1);
expect(seen[0].recipient).toBe('u1');
expect(seen[0].notification.title).toBe('Hi');
const rows = await outbox.list();
expect(rows[0].status).toBe('success');
expect(rows[0].attempts).toBe(1);
void channel;
});

it('retries a failed send (status back to pending with a future nextAttemptAt)', async () => {
const outbox = new MemoryNotificationOutbox(1);
const failing: MessagingChannel = {
id: 'inbox',
async send() { return { ok: false, error: 'db down' }; },
classifyError: () => 'retryable',
};
await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: { title: 'Hi' } });

const d = dispatcher(outbox, [failing]);
await d.tick();

const [row] = await outbox.list();
expect(row.status).toBe('pending');
expect(row.attempts).toBe(1);
expect(row.nextAttemptAt).toBeGreaterThan(Date.now());
expect(row.error).toBe('db down');
});

it('dead-letters a row whose channel is not registered', async () => {
const outbox = new MemoryNotificationOutbox(1);
await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'sms', payload: { title: 'Hi' } });
const d = dispatcher(outbox, []); // no channels
await d.tick();
const [row] = await outbox.list();
expect(row.status).toBe('dead');
expect(row.error).toContain("channel 'sms' not registered");
});

it('eventually dead-letters after the retry budget is exhausted', async () => {
// Shared injectable clock: advance past the largest backoff each round so
// the row is ready, deterministically, without real timers.
let t = 1_000;
const clock = () => t;
const outbox = new MemoryNotificationOutbox(1, clock);
const failing: MessagingChannel = {
id: 'inbox',
async send() { return { ok: false, error: 'always fails' }; },
classifyError: () => 'retryable',
};
const id = await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: { title: 'Hi' } });
const d = dispatcher(outbox, [failing], () => 0.5, clock);

// 6 attempts: 5 scheduled retries then dead.
for (let i = 0; i < 6; i++) {
t += 5_000_000; // > max backoff (3.6M × 1.2)
await d.tick();
}
const [row] = await outbox.list();
expect(row.status).toBe('dead');
expect(row.attempts).toBe(6);
expect(row.id).toBe(id);
});

it('dedups enqueue on (notification, recipient, channel)', async () => {
const outbox = new MemoryNotificationOutbox(1);
const a = await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: {} });
const b = await outbox.enqueue({ notificationId: 'e1', recipientId: 'u1', channel: 'inbox', payload: {} });
expect(a).toBe(b);
expect(await outbox.list()).toHaveLength(1);
});
});
Loading