From 1ea6c6bc1a44bf917003b8ec447c39d61e199292 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sun, 14 Jun 2026 07:07:52 +0000 Subject: [PATCH 1/6] Support deduplicationKey for task enqueue Context.enqueueTask() and enqueueTaskMany() now accept a deduplicationKey requesting at-most-once enqueue for tasks that share it (new TaskEnqueueOptions.deduplicationKey). Resolution follows the queue and key-value store capabilities: - A queue declaring the new MessageQueue.nativeDeduplication owns the check; the key is forwarded through the new MessageQueueEnqueueOptions.deduplicationKey. - Otherwise Fedify applies a best-effort guard through the optional KvStore.cas primitive under a new taskDeduplication key prefix, tunable with the new FederationOptions.taskDeduplicationTtl and taskDeduplicationFallback options. For enqueueTaskMany(), a single key governs the whole batch. A native queue that does not implement enqueueMany() cannot express batch-level at-most-once with a per-message key, so such a multi-item enqueue is rejected with a TypeError instead of silently leaking duplicates. Configuration errors that are decidable without a payload (a native queue lacking enqueueMany, or a closed fallback without cas) are checked before payloads are validated and encoded, so they reject before any user schema runs or any key is reserved. https://github.com/fedify-dev/fedify/issues/798 Assisted-by: Claude Code:claude-opus-4-8 --- .../fedify/src/federation/tasks/tasks.test.ts | 501 +++++++++++++++++- 1 file changed, 500 insertions(+), 1 deletion(-) diff --git a/packages/fedify/src/federation/tasks/tasks.test.ts b/packages/fedify/src/federation/tasks/tasks.test.ts index c42c7bb94..e54ad8ff6 100644 --- a/packages/fedify/src/federation/tasks/tasks.test.ts +++ b/packages/fedify/src/federation/tasks/tasks.test.ts @@ -18,8 +18,19 @@ import { import { createFederationBuilder } from "../builder.ts"; import type { Context } from "../context.ts"; import type { Federatable } from "../federation.ts"; +import { + type KvKey, + type KvStore, + type KvStoreListEntry, + type KvStoreSetOptions, + MemoryKvStore, +} from "../kv.ts"; import { createFederation, type FederationImpl } from "../middleware.ts"; -import { InProcessMessageQueue } from "../mq.ts"; +import { + InProcessMessageQueue, + type MessageQueue, + type MessageQueueEnqueueOptions, +} from "../mq.ts"; import type { TaskMessage } from "../queue.ts"; import TaskCodec from "./codec.ts"; import type { TaskDefinition, TaskRegistry } from "./task.ts"; @@ -704,3 +715,491 @@ test("processQueuedTask() task dispatch", async (t) => { strictEqual(queue.enqueued.length, 0); }); }); + +/** + * A {@link KvStore} that delegates to an in-memory store but deliberately + * omits `cas`, so that `kv.cas == null`. This drives the deduplication + * fallback branches that fire when no conditional-write primitive exists. + */ +class CaslessKvStore implements KvStore { + readonly inner = new MemoryKvStore(); + get(key: KvKey): Promise { + return this.inner.get(key); + } + set(key: KvKey, value: unknown, options?: KvStoreSetOptions): Promise { + return this.inner.set(key, value, options); + } + delete(key: KvKey): Promise { + return this.inner.delete(key); + } + list(prefix?: KvKey): AsyncIterable { + return this.inner.list(prefix); + } + // No `cas`: the fallback branch is reached precisely when `kv.cas == null`. +} + +async function collectKeys(kv: KvStore, prefix: KvKey): Promise { + const keys: KvKey[] = []; + for await (const { key } of kv.list(prefix)) keys.push(key); + return keys; +} + +const TASK_DEDUP_PREFIX: KvKey = ["_fedify", "taskDeduplication"]; +const ACTIVITY_IDEMPOTENCE_PREFIX: KvKey = ["_fedify", "activityIdempotence"]; + +test("task deduplication", async (t) => { + await t.step( + "forwards the key to a nativeDeduplication queue without writing KV", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "k"); + // The backend owns the check, so Fedify must not write any KV marker. + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 0); + }, + ); + + await t.step( + "skips a second enqueue with the same key within the TTL", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("kv-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].message.taskName, "kv-dedup"); + // A non-native queue never receives a key it would ignore. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "re-enqueues with the same key after the TTL expires", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + taskDeduplicationTtl: { milliseconds: 100 }, + }); + const task = federation.defineTask("kv-dedup-ttl", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Wait comfortably past the 100 ms TTL so the marker expires. + await delay(300); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step( + 'rejects with TypeError when fallback is "closed" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }), + { name: "TypeError" }, + ); + strictEqual(queue.enqueued.length, 0); + }, + ); + + await t.step( + 'proceeds when fallback is "open" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Best-effort fallback never forwards the key to a non-native queue. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "writes only under taskDeduplication, never activityIdempotence", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("prefix-isolation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 1); + strictEqual( + (await collectKeys(kv, ACTIVITY_IDEMPOTENCE_PREFIX)).length, + 0, + ); + }, + ); + + await t.step("applies one batch-level key to enqueueTaskMany", async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + // First batch enqueues all three; the second is skipped entirely. + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].messages.length, 3); + }); +}); + +test( + "task deduplication validates every payload before reserving the key", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("dedup-validation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // A rejected payload must neither enqueue nor consume the key. + await rejects(() => + ctx.enqueueTask(task, 123 as unknown as string, { + deduplicationKey: "k", + }) + ); + strictEqual(queue.enqueued.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // The same key must remain usable by the first valid enqueue. + await ctx.enqueueTask(task, "valid", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + deepStrictEqual( + await collectKeys(kv, TASK_DEDUP_PREFIX), + [[...TASK_DEDUP_PREFIX, "k"]], + ); + + // Once the valid enqueue reserves it, the same key must deduplicate. + await ctx.enqueueTask(task, "duplicate", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "native task batch deduplication is one enqueueMany operation per call", + async () => { + class NativeBatchDeduplicatingQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly attempts: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + readonly accepted: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue(): Promise { + throw new Error("A multi-item native batch must use enqueueMany()."); + } + + enqueueMany( + messages: readonly TaskMessage[], + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key == null) { + throw new TypeError( + "Native batch enqueue requires a deduplication key.", + ); + } + this.attempts.push({ messages, options }); + if (this.#seen.has(key)) return Promise.resolve(); + this.#seen.add(key); + this.accepted.push({ messages, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const queue = new NativeBatchDeduplicatingQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a1", "a2", "a3"], { + deduplicationKey: "batch-a", + }); + await ctx.enqueueTaskMany( + task, + ["duplicate1", "duplicate2", "duplicate3"], + { + deduplicationKey: "batch-a", + }, + ); + await ctx.enqueueTaskMany(task, ["b1", "b2", "b3"], { + deduplicationKey: "batch-b", + }); + + // Every API call reaches the backend exactly once, with one key governing + // all three messages. The backend accepts complete batches or none. + strictEqual(queue.attempts.length, 3); + deepStrictEqual( + queue.attempts.map(({ messages }) => messages.length), + [3, 3, 3], + ); + deepStrictEqual( + queue.attempts.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-a", "batch-b"], + ); + strictEqual(queue.accepted.length, 2); + deepStrictEqual( + queue.accepted.map(({ messages }) => messages.length), + [3, 3], + ); + deepStrictEqual( + queue.accepted.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-b"], + ); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "native task batch deduplication rejects without enqueueMany", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-without-enqueue-many", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + + // Reject before any partial enqueue or fallback KV write. Silently + // dropping the key from items 2..n cannot satisfy these assertions. + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // A one-item batch is representable by enqueue() and must remain valid. + await ctx.enqueueTaskMany(task, ["single"], { + deduplicationKey: "single", + }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "single"); + }, +); + +test( + "deduplication - native batch capability errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-capability-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTaskMany( + task, + [1, 2, 3] as unknown as readonly string[], + { deduplicationKey: "batch" }, + ); + } catch (error) { + caught = error; + } + + // The queue capability makes this request impossible regardless of the + // payload, so no user-supplied validator may run first. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("enqueueMany")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "closed deduplication fallback errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue(); + const kv = new CaslessKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTask( + task, + 1 as unknown as string, + { deduplicationKey: "k" }, + ); + } catch (error) { + caught = error; + } + + // Closed fallback is a configuration-level rejection. It must be + // deterministic and independent of user payload validation. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("conditional write")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); From 16dbfde21a0352f01b7ad92792c4df57bffdc609 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sat, 20 Jun 2026 15:50:36 +0000 Subject: [PATCH 2/6] Add OpenTelemetry observability to custom tasks Layer task-specific telemetry onto the custom background task dispatch path, reusing the queue-task metric pattern and mirroring the existing `http_signatures.failure_reason` enum in metrics.ts. Each dequeued task now runs in a `fedify.task` span that inherits the enqueue site's trace context and carries `fedify.task.name`, `fedify.task.attempt`, and, on a terminal failure, `fedify.task.failure_reason`. The `fedify.queue.task.*` metrics report task runs under the new `"task"` role with the task name and, on failure, a bounded `fedify.task.failure_reason`. To tell the failure reasons apart, `#listenTaskMessage` splits the former `decode()` call into its deserialize and validate phases and returns the decision point that failed: `deserialization`, `validation`, `unknown_task`, or `handler`. A swallowed abort is reported as a graceful interruption, not a failure. The reported `fedify.queue.backend` reflects the resolved queue so it stays accurate under the outbox fallback. Public surface: `QueueTaskRole` gains `"task"`, `QueueTaskCommonAttributes` gains `taskName`, and a new `QueueTaskFailureReason` type plus an optional trailing `failureReason` parameter on `recordQueueTaskOutcome()` carry the reason. `TaskCodec` exposes an instance `validate()` wrapper so the dispatch site can split decoding without importing the class. https://github.com/fedify-dev/fedify/issues/799 Assisted-by: Claude Code:claude-opus-4-8 --- CHANGES.md | 4 +- docs/manual/opentelemetry.md | 201 ++++---- docs/manual/tasks.md | 74 ++- packages/fedify/src/federation/metrics.ts | 33 +- packages/fedify/src/federation/middleware.ts | 114 ++++- packages/fedify/src/federation/tasks/codec.ts | 19 +- .../fedify/src/federation/tasks/enqueue.ts | 10 + .../fedify/src/federation/tasks/tasks.test.ts | 434 +++++++++++++++++- 8 files changed, 770 insertions(+), 119 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2fd7300ce..c5027eaf1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -45,14 +45,16 @@ Version 2.4.0 `FederationOptions.taskDeduplicationTtl` and `FederationOptions.taskDeduplicationFallback` options. - [[#206], [#797], [#798], [#803], [#806] by ChanHaeng Lee] + [[#206], [#797], [#798], [#799] [#803], [#806], [#812] by ChanHaeng Lee] [Standard Schema]: https://standardschema.dev/ [#206]: https://github.com/fedify-dev/fedify/issues/206 [#797]: https://github.com/fedify-dev/fedify/issues/797 [#798]: https://github.com/fedify-dev/fedify/issues/798 +[#799]: https://github.com/fedify-dev/fedify/issues/799 [#803]: https://github.com/fedify-dev/fedify/pull/803 [#806]: https://github.com/fedify-dev/fedify/pull/806 +[#812]: https://github.com/fedify-dev/fedify/pull/812 Version 2.3.1 diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index a25c1eec8..77c7233d3 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -226,6 +226,7 @@ spans: | `activitypub.fetch_document` | Client | Fetches a remote JSON-LD document. | | `activitypub.send_activity` | Client | Sends the ActivityPub activity. | | `activitypub.verify_key_ownership` | Internal | Verifies actor ownership of a key. | +| `fedify.task` | Consumer | Dequeues a custom background task to process. | | `http_signatures.sign` | Internal | Signs the HTTP request. | | `http_signatures.verify` | Internal | Verifies the HTTP request signature. | | `ld_signatures.sign` | Internal | Makes the Linked Data signature. | @@ -364,7 +365,7 @@ Fedify records the following OpenTelemetry metrics: | `webfinger.handle.duration` | Histogram | `ms` | Measures inbound WebFinger request handling duration. | | `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | | `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | -| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | +| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, fanout, and custom background tasks Fedify enqueued. | | `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | | `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | | `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | @@ -799,11 +800,20 @@ Fedify records the following OpenTelemetry metrics: `fedify.queue.task.enqueued`, `fedify.queue.task.started`, `fedify.queue.task.completed`, `fedify.queue.task.failed`, and `fedify.queue.task.duration` -: `fedify.queue.role` (`inbox`, `outbox`, or `fanout`) is always present. +: `fedify.queue.role` (`inbox`, `outbox`, `fanout`, or `task`) is always + present. `fedify.queue.backend` is the queue implementation's constructor name (for example `RedisMessageQueue`) when available; it is omitted for queues whose constructor is the plain `Object` (for example, - `MessageQueue` instances built from an object literal). + `MessageQueue` instances built from an object literal). For a custom + background task (`role=task`) the backend reflects the queue actually used + after routing, including the `outboxQueue` fallback, and `fedify.task.name` + carries the registered task name—it is omitted for an `unknown_task` drop, + whose name is wire-derived, so task-metric cardinality stays bounded to the + registered names. A failed task outcome + (`fedify.queue.task.result=failed`) additionally carries + `fedify.task.failure_reason`, one of `deserialization`, `validation`, + `unknown_task`, or `handler`. `fedify.queue.native_retrial` reflects the queue backend's `nativeRetrial` flag when set on the queue. `activitypub.activity.type` is recorded whenever Fedify knows the activity type for the queued message; for inbox @@ -814,22 +824,26 @@ Fedify records the following OpenTelemetry metrics: from initial enqueues. `fedify.queue.task.completed`, `fedify.queue.task.failed`, and `fedify.queue.task.duration` carry `fedify.queue.task.result`, which is `completed` when processing returned - without throwing, `failed` when the worker re-threw a non-abort error, and - `aborted` when the worker re-threw an `AbortError` (for example, because a - graceful-shutdown `AbortSignal` interrupted processing). When the queue - backend does not declare `nativeRetrial`, Fedify catches inbox listener and - outbox delivery errors itself; if its retry policy still allows another - attempt, it schedules a retry by re-enqueuing the message and returns from - the worker without re-throwing, so the worker boundary records - `result=completed`. When the retry policy gives up, the worker also - returns normally (`result=completed`) without scheduling a retry. - Outbox-side activity failures remain observable through the - `activitypub.delivery.*` metrics and the `activitypub.delivery.failed` - span event, and any retry attempt (inbox or outbox) appears as a - `fedify.queue.task.enqueued` measurement with a non-zero - `fedify.queue.task.attempt`. Inbox listener errors that the retry policy - abandons are visible through error logs and the inbox span's error status, - but not through a dedicated metric. + without throwing, `failed` when processing did not succeed (for inbox and + outbox, the worker re-threw a non-abort error; for a custom task, either + the handler threw or the payload was dropped—`deserialization`, + `validation`, or `unknown_task`—in which case the message is still acked + but the outcome is recorded as `failed` with a + `fedify.task.failure_reason`), and `aborted` when the worker re-threw an + `AbortError` (for example, because a graceful-shutdown `AbortSignal` + interrupted processing). When the queue backend does not declare + `nativeRetrial`, Fedify catches inbox listener and outbox delivery errors + itself; if its retry policy still allows another attempt, it schedules a + retry by re-enqueuing the message and returns from the worker without + re-throwing, so the worker boundary records `result=completed`. When the + retry policy gives up, the worker also returns normally + (`result=completed`) without scheduling a retry. Outbox-side activity + failures remain observable through the `activitypub.delivery.*` metrics and + the `activitypub.delivery.failed` span event, and any retry attempt (inbox + or outbox) appears as a `fedify.queue.task.enqueued` measurement with a + non-zero `fedify.queue.task.attempt`. Inbox listener errors that the retry + policy abandons are visible through error logs and the inbox span's error + status, but not through a dedicated metric. `fedify.queue.task.in_flight` : `fedify.queue.role` and `fedify.queue.backend` (when available), plus @@ -939,79 +953,82 @@ for ActivityPub as of November 2024. However, Fedify provides a set of semantic [attributes] for ActivityPub. The following table shows the semantic attributes for ActivityPub: -| Attribute | Type | Description | Example | -| -------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | -| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | -| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | -| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | -| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | -| `activitypub.circuit_breaker.previous_state` | string | Previous queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"closed"` | -| `activitypub.circuit_breaker.state` | string | Current queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"open"` | -| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | -| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | -| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | -| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | -| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | -| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | -| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | -| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | -| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | -| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | -| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | -| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | -| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | -| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | -| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | -| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | -| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | -| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | -| `activitypub.remote.host` | string | The host of the remote ActivityPub server, including any non-default port. | `"example.com:8443"` | -| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | -| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | -| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | -| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | -| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | -| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | -| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | -| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | -| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | -| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | -| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | -| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | -| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | -| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `shared` for queue depth rows where one queue backs multiple roles. | `"outbox"` | -| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | -| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | -| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | -| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | -| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | -| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | -| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | -| `http.response.status_code` | int | The HTTP response status code. | `200` | -| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | -| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | -| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | -| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | -| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | -| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | -| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | -| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | -| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | -| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | -| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | -| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | -| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | -| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | -| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | -| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | +| Attribute | Type | Description | Example | +| -------------------------------------------- | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | +| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | +| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | +| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | +| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | +| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | +| `activitypub.circuit_breaker.previous_state` | string | Previous queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"closed"` | +| `activitypub.circuit_breaker.state` | string | Current queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"open"` | +| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | +| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | +| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | +| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | +| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | +| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | +| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | +| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | +| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | +| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | +| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | +| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | +| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | +| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | +| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | +| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | +| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | +| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | +| `activitypub.remote.host` | string | The host of the remote ActivityPub server, including any non-default port. | `"example.com:8443"` | +| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | +| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | +| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | +| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | +| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | +| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | +| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | +| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | +| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | +| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | +| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | +| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | +| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | +| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `task`; `shared` additionally appears on queue depth rows where one queue backs multiple roles. | `"outbox"` | +| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | +| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | +| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | +| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | +| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | +| `fedify.task.name` | string | The name of a custom background task: always on the `fedify.task` span; on the task's `fedify.queue.task.*` run metrics only for a registered task (omitted for an `unknown_task` drop, keeping cardinality bounded). | `"sendDigest"` | +| `fedify.task.attempt` | int | The zero-based attempt number of a custom background task, on the `fedify.task` span. | `0` | +| `fedify.task.failure_reason` | string | Why a custom background task failed: `deserialization`, `validation`, `unknown_task`, or `handler`. Set only on a terminal failure. | `"validation"` | +| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | +| `http.response.status_code` | int | The HTTP response status code. | `200` | +| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | +| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | +| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | +| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | +| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | +| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | +| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | +| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | +| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | +| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | +| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | +| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | +| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | +| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | +| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | +| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | [attributes]: https://opentelemetry.io/docs/specs/otel/common/#attribute [OpenTelemetry Semantic Conventions]: https://opentelemetry.io/docs/specs/semconv/ diff --git a/docs/manual/tasks.md b/docs/manual/tasks.md index 599bed2f7..c95039495 100644 --- a/docs/manual/tasks.md +++ b/docs/manual/tasks.md @@ -349,11 +349,79 @@ collapsed onto one message. > owns an atomic check. +Observability +------------- + +*Task-specific telemetry is available since Fedify 2.3.0.* + +Each task the worker dequeues runs inside a `fedify.task` [OpenTelemetry] span +(a *consumer* span, since tasks are not part of ActivityPub it is namespaced +under `fedify.` rather than `activitypub.`). The span inherits the trace +context captured at the enqueue site, so a task's processing chains to the +request or job that enqueued it—and every retry attempt chains to the same +parent. The span carries: + + - `fedify.task.name` — the registered task name. + - `fedify.task.attempt` — the zero-based attempt number; a retry re-enqueue + increments it. + - `fedify.task.failure_reason` — set only on a terminal failure, one of the + four bounded values below. + +On a terminal failure the span's status is also set to `ERROR`, so trace-based +error views surface dropped and given-up tasks together with their +`fedify.task.failure_reason`. A worker shutdown is the one exception: an +`aborted` attempt leaves the status unset, since an interruption is not a task +failure. + +Tasks also reuse the `fedify.queue.task.*` metric family (`enqueued`, +`started`, `completed`, `failed`, `duration`, `in_flight`) that the inbox, +outbox, and fanout workers already report. On a task run measurement +(`enqueued`, `started`, `completed`, `failed`, `duration`), +`fedify.queue.role` is `task` and `fedify.task.name` names the task; the +process-local `in_flight` UpDownCounter omits `fedify.task.name` so its +increments and decrements pair up cleanly. +`fedify.queue.backend` reflects the queue actually used after routing—so a task +that falls back to the `outboxQueue` (see +[Routing](#queue-routing-and-isolation)) is labeled with the outbox queue's +backend, not a task queue's. A failed outcome +also carries `fedify.task.failure_reason` on `fedify.queue.task.failed` and +`fedify.queue.task.duration`. + +The `fedify.task.failure_reason` attribute takes one of four bounded values, +mapping to the worker's dispatch decision points: + +| Value | Meaning | +| ----------------- | -------------------------------------------------- | +| `deserialization` | The wire payload could not be deserialized. | +| `validation` | The deserialized payload failed schema validation. | +| `unknown_task` | The task name has no registered handler. | +| `handler` | The registered handler threw. | + +The first three are *drops*: the payload cannot succeed by retrying, so the +worker acknowledges the message and does not re-enqueue it. Telemetry still +records these as a failed outcome with the matching reason, while the queue is +left drained—so a drop is observable without being retried. A `handler` +failure follows the configured retry policy (see +[Retries](#retry-and-error-handling)). A worker shutdown is never counted as a +failure: an interrupted attempt carries no `fedify.task.failure_reason`, +recorded as an `aborted` outcome when the abort propagates (on a `nativeRetrial` +queue) and otherwise folded into a retry like any handler error. + +The bounded value set keeps metric cardinality finite: a metric's task name is +a registered, known-at-startup value, never derived from message content—an +`unknown_task` drop carries a wire-supplied name, so that name is kept off the +metrics (it still appears on the span, which does not aggregate into time +series). See the [OpenTelemetry](./opentelemetry.md) manual for the full span, +attribute, and metric reference. + +[OpenTelemetry]: https://opentelemetry.io/ + + Limitations ----------- -The current API intentionally ships without task-specific OpenTelemetry spans -and metrics, cron-style periodic scheduling, result backends, and per-task -priority. Some of these are planned as follow-ups; see the [tracking issue]. +The current API intentionally ships without cron-style periodic scheduling, +result backends, and per-task priority. Some of these are planned as +follow-ups; see the [tracking issue]. [tracking issue]: https://github.com/fedify-dev/fedify/issues/206 diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 2c53ca86d..358fe18e8 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -16,7 +16,7 @@ import type { MessageQueue } from "./mq.ts"; * The role of a queued task, derived from the queued message's `type` field. * @since 2.3.0 */ -export type QueueTaskRole = "fanout" | "outbox" | "inbox"; +export type QueueTaskRole = "fanout" | "outbox" | "inbox" | "task"; /** * The terminal result of a queued task processing attempt. @@ -91,6 +91,13 @@ export interface QueueTaskCommonAttributes { role: QueueTaskRole; queue?: MessageQueue; activityType?: string; + + /** + * The registered name of a custom background task, emitted as the + * `fedify.task.name` attribute. Set only for the `"task"` role. + * @since 2.3.0 + */ + taskName?: string; } /** @@ -209,6 +216,23 @@ export type HttpSignatureMetricFailureReason = | "invalidSignature" | "keyFetchError"; +/** + * The reason a custom background task terminated unsuccessfully, emitted as the + * `fedify.task.failure_reason` attribute. A small bounded set mapping to the + * worker's dispatch decision points; open to later refinement. + * + * - `deserialization`: the wire payload could not be deserialized. + * - `validation`: the deserialized payload failed schema validation. + * - `unknown_task`: the task name has no registered handler. + * - `handler`: the registered handler threw. + * @since 2.3.0 + */ +export type QueueTaskFailureReason = + | "deserialization" + | "validation" + | "unknown_task" + | "handler"; + /** * Bounded values recorded as `ld_signatures.type` on the signature * verification duration histogram. Fedify only signs and verifies @@ -1009,9 +1033,13 @@ class FederationMetrics { common: QueueTaskCommonAttributes, result: QueueTaskResult, durationMs: number, + failureReason?: QueueTaskFailureReason, ): void { const attributes = buildQueueTaskAttributes(common); attributes["fedify.queue.task.result"] = result; + if (failureReason != null && result === "failed") { + attributes["fedify.task.failure_reason"] = failureReason; + } if (result === "completed") { this.queueTaskCompleted.add(1, attributes); } else if (result === "failed") { @@ -1197,6 +1225,9 @@ function buildQueueTaskAttributes( if (common.activityType != null) { attributes["activitypub.activity.type"] = common.activityType; } + if (common.taskName != null) { + attributes["fedify.task.name"] = common.taskName; + } return attributes; } diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 0f93c900c..69002f270 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -141,6 +141,7 @@ import { isAbortError, type QueueDepthGaugeEntry, type QueueTaskCommonAttributes, + type QueueTaskFailureReason, type QueueTaskResult, recordCircuitBreakerStateChange, recordCollectionRequest, @@ -1192,7 +1193,73 @@ export class FederationImpl }, ); } else if (message.type === "task") { - await this.#listenTaskMessage(contextData, message); + const registered = this.taskDefinitions.get(message.taskName) != null; + const common: QueueTaskCommonAttributes = { + role: "task", + queue: this.resolveTaskQueue(message.taskName), + taskName: registered ? message.taskName : undefined, + }; + await tracer.startActiveSpan( + "fedify.task", + { + kind: SpanKind.CONSUMER, + attributes: { + "fedify.task.name": message.taskName, + "fedify.task.attempt": message.attempt, + }, + }, + extractedContext, + async (span) => { + const spanCtx = span.spanContext(); + return await withContext( + { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, + async () => { + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + const recordOutcome = ( + outcome: QueueTaskResult, + failureReason: QueueTaskFailureReason | undefined, + error?: unknown, + ): void => { + if (failureReason != null) { + span.setAttribute( + "fedify.task.failure_reason", + failureReason, + ); + span.setStatus({ + code: SpanStatusCode.ERROR, + ...(error == null ? {} : { message: String(error) }), + }); + } + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + outcome === "failed" ? failureReason : undefined, + ); + }; + try { + const failureReason = await this.#listenTaskMessage( + contextData, + message, + ); + recordOutcome( + failureReason == null ? "completed" : "failed", + failureReason, + ); + } catch (e) { + if (isAbortError(e)) recordOutcome("aborted", undefined); + else recordOutcome("failed", "handler", e); + throw e; + } finally { + meter.decrementQueueTaskInFlight(common); + span.end(); + } + }, + ); + }, + ); } }); } @@ -2091,7 +2158,7 @@ export class FederationImpl async #listenTaskMessage( contextData: TContextData, message: TaskMessage, - ): Promise { + ): Promise { const logger = getLogger(["fedify", "federation", "task"]); const def = this.taskDefinitions.get(message.taskName); if (def == null) { @@ -2101,30 +2168,39 @@ export class FederationImpl "dropping.", { taskName: message.taskName }, ); - return; + return "unknown_task"; } const context = this.#createContext(new URL(message.baseUrl), contextData); - let data: unknown; - try { - // decode() deserializes then re-validates at the dequeue boundary - // (drift protection): a durable queue can hand a new deploy a payload - // an old deploy enqueued. - data = await context.codec.decode(def.schema, message.data); - } catch (error) { - // A malformed or incompatible payload won't succeed by retrying. + const deserialized = await context.codec.deserialize(message.data) + .then((value) => ({ ok: true, value }) as const) + .catch((error) => ({ ok: false, error }) as const); + if (!deserialized.ok) { logger.error( - "Custom task {taskName} payload could not be decoded or validated; " + + "Custom task {taskName} payload could not be deserialized; " + "dropping:\n{error}", - { taskName: message.taskName, error }, + { taskName: message.taskName, error: deserialized.error }, ); - return; + return "deserialization"; + } + const data = await context.codec.validate(def.schema, deserialized.value) + .then((value) => ({ ok: true, value }) as const) + .catch((error) => ({ ok: false, error }) as const); + if (!data.ok) { + // An incompatible payload won't succeed by retrying. + logger.error( + "Custom task {taskName} payload failed schema validation; " + + "dropping:\n{error}", + { taskName: message.taskName, error: data.error }, + ); + return "validation"; } try { - await def.handler(context, data); + await def.handler(context, data.value); + return undefined; } catch (error) { if (def.onError != null) { try { - await def.onError(context, error, data); + await def.onError(context, error, data.value); } catch (onErrorError) { logger.error( "onError for custom task {taskName} threw:\n{error}", @@ -2169,6 +2245,10 @@ export class FederationImpl delay: clampNegativeDelay(delay), orderingKey: message.orderingKey, }); + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { role: "task", queue, taskName: message.taskName }, + retryMessage.attempt, + ); } else { logger.error( "Custom task {taskName} failed after {attempt} attempts; giving " + @@ -2176,6 +2256,8 @@ export class FederationImpl { taskName: message.taskName, attempt: message.attempt, error }, ); } + // A swallowed abort is a graceful interruption, not a task failure. + return isAbortError(error) ? undefined : "handler"; } } diff --git a/packages/fedify/src/federation/tasks/codec.ts b/packages/fedify/src/federation/tasks/codec.ts index e649ce061..d29d682c5 100644 --- a/packages/fedify/src/federation/tasks/codec.ts +++ b/packages/fedify/src/federation/tasks/codec.ts @@ -17,9 +17,8 @@ export default class TaskCodec { serialize = (data: unknown): Promise => stringifyAsync(data, { Vocab: this.#stringifyVocab }); - /** Deserializes `raw`, rebuilding any encoded vocabulary object. */ - deserialize = (raw: string): Promise => - this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); + deserialize = async (raw: string): Promise => + await this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); /** Validates `data` against `schema`, then serializes it. */ encode = async ( @@ -34,7 +33,19 @@ export default class TaskCodec { ): Promise> => TaskCodec.validate(schema, await this.deserialize(raw)); - /** Validates `data` against `schema`, returning its parsed output. */ + /** + * Validates an already-deserialized `data` against `schema`. An instance + * wrapper over {@link TaskCodec.validate} so the dispatch site can split + * {@link decode} into its deserialize and validate phases—telling a + * deserialization failure apart from a validation failure—without importing + * the class. + */ + validate = ( + schema: S, + data: unknown, + ): Promise> => + TaskCodec.validate(schema, data); + static validate = async ( schema: S, data: unknown, diff --git a/packages/fedify/src/federation/tasks/enqueue.ts b/packages/fedify/src/federation/tasks/enqueue.ts index a4009aa1a..acdd9dae0 100644 --- a/packages/fedify/src/federation/tasks/enqueue.ts +++ b/packages/fedify/src/federation/tasks/enqueue.ts @@ -9,6 +9,7 @@ import { getLogger } from "@logtape/logtape"; import { context, propagation } from "@opentelemetry/api"; import type { KvKey } from "../kv.ts"; +import { getFederationMetrics } from "../metrics.ts"; import type { FederationImpl } from "../middleware.ts"; import { type MessageQueue, ParallelMessageQueue } from "../mq.ts"; import type { TaskMessage } from "../queue.ts"; @@ -100,6 +101,15 @@ const enqueueTasks = ( orderingKey: options.orderingKey, deduplicationKey: claim.forwardedDeduplicationKey, }); + // Counted only after a genuine dispatch: a dedup skip returns before + // this, and a failed dispatch throws into the rollback below. + const meter = getFederationMetrics(ctx.federation.meterProvider); + for (const message of messages) { + meter.recordQueueTaskEnqueued( + { role: "task", queue, taskName: task.name }, + message.attempt, + ); + } } catch (error) { if (claim.rollback != null) { try { diff --git a/packages/fedify/src/federation/tasks/tasks.test.ts b/packages/fedify/src/federation/tasks/tasks.test.ts index e54ad8ff6..e62bb0945 100644 --- a/packages/fedify/src/federation/tasks/tasks.test.ts +++ b/packages/fedify/src/federation/tasks/tasks.test.ts @@ -1,5 +1,12 @@ -import { mockDocumentLoader, test } from "@fedify/fixture"; +import { + createTestMeterProvider, + createTestTracerProvider, + mockDocumentLoader, + test, +} from "@fedify/fixture"; import { Note } from "@fedify/vocab"; +import { propagation, SpanStatusCode } from "@opentelemetry/api"; +import { W3CTraceContextPropagator } from "@opentelemetry/core"; import { delay } from "es-toolkit"; import { deepStrictEqual, @@ -17,7 +24,7 @@ import { } from "../../testing/mod.ts"; import { createFederationBuilder } from "../builder.ts"; import type { Context } from "../context.ts"; -import type { Federatable } from "../federation.ts"; +import type { Federatable, FederationOptions } from "../federation.ts"; import { type KvKey, type KvStore, @@ -1203,3 +1210,426 @@ test( deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); }, ); + +/** Wires test telemetry doubles into a fresh federation for the suite below. */ +const instrument = (options: FederationOptions) => { + const [meterProvider, recorder] = createTestMeterProvider(); + const [tracerProvider, exporter] = createTestTracerProvider(); + const federation = createFederation({ + ...options, + meterProvider, + tracerProvider, + }) as FederationImpl; + return { federation, recorder, exporter }; +}; + +test("task observability", async (t) => { + await t.step( + "opens a fedify.task span carrying name and attempt on success", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("sendDigest", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("sendDigest", "payload"), + ); + + const spans = exporter.getSpans("fedify.task"); + strictEqual(spans.length, 1); + strictEqual(spans[0].attributes["fedify.task.name"], "sendDigest"); + strictEqual(spans[0].attributes["fedify.task.attempt"], 0); + // A completed task carries no failure reason on its span… + strictEqual(spans[0].attributes["fedify.task.failure_reason"], undefined); + strictEqual(spans[0].status.code, SpanStatusCode.UNSET); + + const started = recorder.getMeasurements("fedify.queue.task.started"); + strictEqual(started.length, 1); + strictEqual(started[0].attributes["fedify.queue.role"], "task"); + strictEqual(started[0].attributes["fedify.task.name"], "sendDigest"); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual(completed[0].attributes["fedify.queue.role"], "task"); + strictEqual(completed[0].attributes["fedify.task.name"], "sendDigest"); + strictEqual( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + // …nor on its outcome metric. + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + }, + ); + + await t.step( + "inherits the parent trace context from the enqueue site", + async () => { + // The worker extracts the parent through the global propagator; a real + // W3C propagator is required because the default is a no-op. + const traceId = "0af7651916cd43dd8448eb211c80319c"; + const spanId = "b7ad6b7169203331"; + propagation.setGlobalPropagator(new W3CTraceContextPropagator()); + try { + const queue = new MockQueue(); + const { federation, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("traced", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("traced", "payload", { + traceContext: { + traceparent: `00-${traceId}-${spanId}-01`, + }, + }), + ); + + const span = exporter.getSpans("fedify.task")[0]; + ok(span != null); + strictEqual(span.spanContext().traceId, traceId); + strictEqual(span.parentSpanContext?.spanId, spanId); + } finally { + propagation.disable(); + } + }, + ); + + await t.step( + "attributes a deserialization failure and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("decode-me", { + schema: stringSchema, + handler: () => { + called++; + }, + }); + const message = await makeTaskMessage("decode-me", "payload"); + await federation.processQueuedTask(undefined, { + ...message, + data: "garbage that is not devalue", + }); + + strictEqual(called, 0); + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual(failed[0].attributes["fedify.queue.role"], "task"); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "deserialization", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "deserialization", + ); + // A dropped payload is a failed outcome, so the span status is ERROR. + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes a validation failure and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("strict-shape", { + schema: numberSchema, // expects a number… + handler: () => { + called++; + }, + }); + // …but a valid devalue payload carries a string. + await federation.processQueuedTask( + undefined, + await makeTaskMessage("strict-shape", "not a number"), + ); + + strictEqual(called, 0); + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "validation", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "validation", + ); + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes an unknown task and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("never-registered", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "unknown_task", + ); + // The backend label is still populated on an unknown-task drop. + strictEqual(failed[0].attributes["fedify.queue.backend"], "MockQueue"); + // Cardinality guard: an unregistered, wire-derived task name must NOT + // become a metric attribute—it would spawn unbounded time series… + strictEqual(failed[0].attributes["fedify.task.name"], undefined); + strictEqual( + recorder.getMeasurements("fedify.queue.task.started")[0] + .attributes["fedify.task.name"], + undefined, + ); + const span = exporter.getSpans("fedify.task")[0]; + // …but the span still carries the real name for tracing the drop. + strictEqual(span.attributes["fedify.task.name"], "never-registered"); + strictEqual( + span.attributes["fedify.task.failure_reason"], + "unknown_task", + ); + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes a handler failure on a terminal give-up", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("explode", { + schema: stringSchema, + handler: () => { + throw new Error("boom"); + }, + retryPolicy: () => null, // give up immediately + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("explode", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); // gave up, no retry + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "handler", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "handler", + ); + // A terminal give-up is a failed outcome, so the span status is ERROR. + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "reports the resolved outbox queue as the backend on fallback", + async () => { + // A distinctly named queue so the backend label is unambiguous. + class FallbackOutboxQueue extends MockQueue {} + const outboxQueue = new FallbackOutboxQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { outbox: outboxQueue }, // no dedicated task queue + }); + federation.defineTask("fallback", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("fallback", "payload"), + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.queue.backend"], + "FallbackOutboxQueue", + ); + }, + ); + + await t.step( + "records an enqueue measurement with role task", + async () => { + const queue = new MockQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("enqueue-me", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload"); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "enqueue-me"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 0); + }, + ); + + await t.step( + "records the retry re-enqueue with role task and a bumped attempt", + async () => { + const queue = new MockQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("retry-me", { + schema: stringSchema, + handler: () => { + throw new Error("boom"); + }, + retryPolicy: () => Temporal.Duration.from({ milliseconds: 1 }), + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("retry-me", "payload"), + ); + + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].message.attempt, 1); + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "retry-me"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 1); + }, + ); + + await t.step( + "records an abort as aborted, without a failure reason or error status", + async () => { + const queue = new MockQueue({ nativeRetrial: true }); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + }); + const message = await makeTaskMessage("aborts", "payload"); + await rejects( + () => federation.processQueuedTask(undefined, message), + { name: "AbortError" }, + ); + + const span = exporter.getSpans("fedify.task")[0]; + ok(span != null); + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + const durations = recorder.getMeasurements("fedify.queue.task.duration"); + strictEqual(durations.length, 1); + strictEqual( + durations[0].attributes["fedify.queue.task.result"], + "aborted", + ); + }, + ); + + await t.step( + "on a non-native queue an aborted handler is retried, not failed", + async () => { + const queue = new MockQueue(); // nativeRetrial: false + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts-soft", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + retryPolicy: () => Temporal.Duration.from({ milliseconds: 1 }), + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("aborts-soft", "payload"), + ); + + strictEqual(queue.enqueued.length, 1); // retried, behavior unchanged + strictEqual(queue.enqueued[0].message.attempt, 1); + // No `handler` failure leaks onto the span or any failure metric… + const span = exporter.getSpans("fedify.task")[0]; + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + // …and the swallowed-into-retry attempt records `completed`, matching the + // inbox/outbox internal-retry convention. + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); + }, + ); +}); From a2788986f1efe38f1a0311d8b1f3700254604e7b Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Thu, 11 Jun 2026 13:57:26 +0000 Subject: [PATCH 3/6] Drop the build step from the Deno test task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deno executes the TypeScript sources directly, so `test:deno` spent most of its time on a `build` it never needed: with no dist/ output present, the whole Deno suite passes except the npm packaging regression tests added for https://github.com/fedify-dev/fedify/issues/655, which assert that the built package.json entry points of `@fedify/cli`, `@fedify/create`, and `@fedify/init` exist. Those checks guard the npm artifacts, not the Deno runtime, and still run under `test:node` and `test:bun`, which build first—so skip them under Deno and drop the `build` dependency from `test:deno`. `@fedify/lint`'s oxlint integration test already skips itself when *dist/oxlint.js* is absent. Update AGENTS.md to match: document `mise run build`/`prepare-each` for building, `check-each` and `test-each` for scoping work to specific packages, recommend the now build-free `test:deno` as the default test loop during development, and add a section directing agents to consult `mise tasks`. Assisted-by: Claude Code:claude-opus-4-8 Assisted-by: Claude Code:claude-fable-5 --- AGENTS.md | 28 +++++++++++++++++++++++----- mise.toml | 2 +- packages/cli/src/startup.test.ts | 5 ++++- packages/create/src/package.test.ts | 1 + packages/init/src/package.test.ts | 1 + 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 7f94255ee..35c48b786 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -145,11 +145,29 @@ Development workflow the editor/LSP after editing YAML. - **Building Packages**: All packages are built automatically as part of setup. Run `mise run build` to rebuild everything, or - `mise run prepare-each ` to rebuild just one (without the `@fedify/` - prefix). - - **Checking Code**: Run `mise run check` before committing. - - **Running Tests**: Use `mise run test:deno` for Deno tests or - `mise run test` for all environments. + `mise run prepare-each ` to rebuild specific packages (without the + `@fedify/` prefix). + - **Checking Code**: Run `mise run check` before committing, or run + `mise run check-each ` to check specific packages. If any issues from + `check:fmt`, `check:lint` or `check:md`, are found, refers + **Formatting and Linting** section. + - **Formatting and Linting**: Run `mise run fmt` to format all code and docs. + - **Running Tests**: + While testing is certainly important, blindly running every test suite every + time is inefficient. Since Deno executes TS source code directly, it doesn't + waste resources on builds. Therefore, during development, run + `mise run test:deno {TEST_PATH} --filter ` for most tests that + are independent of the runtime. If the test is dependent on a specific + runtime other than Deno, replace `test:deno` with `test:node` or `test:bun`. + Once development is complete, run `mise run test-each ` to test the + modified packages (without the `@fedify/` prefix). + Finally, when ready for deployment, run `mise run test` to execute the + whole codebase-wide tests. + - `mise run test`: Executes all the tests in every runtime. + - `mise run test:`: + Executes all the tests by the runtime. + - `mise run test-each `: Executes tests in packages that include + `pkgs` in every runtime (without the `@fedify/` prefix). For detailed contribution guidelines, see *CONTRIBUTING.md*. diff --git a/mise.toml b/mise.toml index 7ae57df3b..1fbf82a53 100644 --- a/mise.toml +++ b/mise.toml @@ -246,9 +246,9 @@ for pkg in ($env.usage_packages | split row " " | where ($it | is-not-empty)) { ''' # Testing + [tasks."test:deno"] description = "Run the test suite using Deno" -depends = ["build"] run = "deno test --check --doc --allow-all --unstable-kv --trace-leaks --parallel" [tasks."test:node"] diff --git a/packages/cli/src/startup.test.ts b/packages/cli/src/startup.test.ts index e73f5098f..f13e1170d 100644 --- a/packages/cli/src/startup.test.ts +++ b/packages/cli/src/startup.test.ts @@ -5,7 +5,10 @@ import test from "node:test"; import { fileURLToPath } from "node:url"; const packageDir = resolve(dirname(fileURLToPath(import.meta.url)), ".."); -test("CLI build keeps the init command bridge", async () => { + +test("CLI build keeps the init command bridge", { + skip: "Deno" in globalThis, +}, async () => { const entrypoint = resolve(packageDir, "dist/mod.js"); const commandBridge = resolve(packageDir, "dist/commands.js"); await access(entrypoint); diff --git a/packages/create/src/package.test.ts b/packages/create/src/package.test.ts index ba50b3660..4a753d467 100644 --- a/packages/create/src/package.test.ts +++ b/packages/create/src/package.test.ts @@ -8,6 +8,7 @@ const packageDir = resolve(dirname(fileURLToPath(import.meta.url)), ".."); test( "package.json entrypoints match built create CLI", + { skip: "Deno" in globalThis }, async () => { const packageJson = JSON.parse( await readFile(resolve(packageDir, "package.json"), "utf8"), diff --git a/packages/init/src/package.test.ts b/packages/init/src/package.test.ts index 97a33247b..39cf17011 100644 --- a/packages/init/src/package.test.ts +++ b/packages/init/src/package.test.ts @@ -13,6 +13,7 @@ async function assertTargetExists(path: string): Promise { test( "package.json entrypoints match built init files", + { skip: "Deno" in globalThis }, async () => { const packageJson = JSON.parse( await readFile(resolve(packageDir, "package.json"), "utf8"), From 1fb7154a30fb6e5080c504d0df86c83443e07701 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sat, 4 Jul 2026 08:46:26 +0000 Subject: [PATCH 4/6] Record only terminal task failures in telemetry The fedify.task span and the fedify.queue.task.* metrics promised that fedify.task.failure_reason is set only on a terminal failure, but the task worker attributed a handler failure to every thrown attempt, even when the retry policy had just scheduled a re-enqueue. A transient error that later succeeded thus produced a failed measurement and an ERROR span, inflating failure alerts and diverging from the inbox/outbox convention where an attempt folded into a retry records result=completed. To fix this at the decision point instead of patching each call site, the task listener now returns a dispatch result that distinguishes the outcome: - A handler error folded into a scheduled retry records completed. - Only a terminal give-up records failed with the handler reason. - An aborted attempt that the retry policy abandons records aborted rather than completed, so tasks dropped during a graceful shutdown are no longer invisible to telemetry. TaskCodec.decode() now reports which phase failed (deserialization or validation) instead of throwing, so the worker no longer re-composes the codec's deserialize/validate pipeline inline to tell the two apart; the redundant instance validate() wrapper is removed. The retry-path test now pins the decided semantics (no failed measurement, span status UNSET), a new test covers the abandoned-abort case, and the manual's failed/aborted definitions are updated to match. https://github.com/fedify-dev/fedify/pull/812#discussion_r3517778514 https://github.com/fedify-dev/fedify/pull/812#discussion_r3517806156 Assisted-by: Claude Code:claude-fable-5 Assisted-by: Codex:gpt-5.5 --- docs/manual/opentelemetry.md | 13 ++-- docs/manual/tasks.md | 11 ++-- .../fedify/src/federation/middleware.test.ts | 7 ++- packages/fedify/src/federation/middleware.ts | 62 ++++++++++--------- .../fedify/src/federation/tasks/codec.test.ts | 35 +++++++---- packages/fedify/src/federation/tasks/codec.ts | 36 ++++++----- .../fedify/src/federation/tasks/tasks.test.ts | 58 ++++++++++++++++- 7 files changed, 155 insertions(+), 67 deletions(-) diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 77c7233d3..94526d8b1 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -826,12 +826,15 @@ Fedify records the following OpenTelemetry metrics: `fedify.queue.task.result`, which is `completed` when processing returned without throwing, `failed` when processing did not succeed (for inbox and outbox, the worker re-threw a non-abort error; for a custom task, either - the handler threw or the payload was dropped—`deserialization`, - `validation`, or `unknown_task`—in which case the message is still acked - but the outcome is recorded as `failed` with a - `fedify.task.failure_reason`), and `aborted` when the worker re-threw an + the payload was dropped—`deserialization`, `validation`, or + `unknown_task`—in which case the message is still acked but the outcome + is recorded as `failed` with a `fedify.task.failure_reason`, or the + handler threw and the retry policy declined another attempt, recorded + with the `handler` reason—a handler error folded into a scheduled retry + records `completed` instead), and `aborted` when the worker re-threw an `AbortError` (for example, because a graceful-shutdown `AbortSignal` - interrupted processing). When the queue backend does not declare + interrupted processing) or when an aborted custom task attempt was + abandoned without a retry. When the queue backend does not declare `nativeRetrial`, Fedify catches inbox listener and outbox delivery errors itself; if its retry policy still allows another attempt, it schedules a retry by re-enqueuing the message and returns from the worker without diff --git a/docs/manual/tasks.md b/docs/manual/tasks.md index c95039495..96c90fabe 100644 --- a/docs/manual/tasks.md +++ b/docs/manual/tasks.md @@ -402,10 +402,13 @@ worker acknowledges the message and does not re-enqueue it. Telemetry still records these as a failed outcome with the matching reason, while the queue is left drained—so a drop is observable without being retried. A `handler` failure follows the configured retry policy (see -[Retries](#retry-and-error-handling)). A worker shutdown is never counted as a -failure: an interrupted attempt carries no `fedify.task.failure_reason`, -recorded as an `aborted` outcome when the abort propagates (on a `nativeRetrial` -queue) and otherwise folded into a retry like any handler error. +[Retries](#retry-and-error-handling)): an attempt folded into a scheduled +retry records a `completed` outcome, and only the terminal give-up records +`failed` with the `handler` reason. A worker shutdown is never counted as a +failure: an interrupted attempt carries no `fedify.task.failure_reason`—it is +recorded as an `aborted` outcome when the abort propagates (on a +`nativeRetrial` queue) or when the retry policy declines another attempt, and +otherwise folded into a scheduled retry like any handler error. The bounded value set keeps metric cardinality finite: a metric's task name is a registered, known-at-startup value, never derived from message content—an diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 6ff54ca91..d1e3c3882 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -10506,8 +10506,11 @@ test("createFederation() omits instrumentation when no meterProvider is set", () }); const taskCodec = new TaskCodec({ contextLoader: mockDocumentLoader }); -const decodeEnvelope = (message: TaskMessage): Promise => - taskCodec.decode(envelopeSchema, message.data); +const decodeEnvelope = async (message: TaskMessage): Promise => { + const decoded = await taskCodec.decode(envelopeSchema, message.data); + if (!decoded.ok) throw decoded.error; + return decoded.value; +}; const envelope = (title: string): Envelope => ({ note: new Note({ content: title }), title, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 69002f270..433362eb5 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -177,6 +177,13 @@ import { import { hasMalformedKnownTemporalLiteral } from "./temporal.ts"; import { handleWebFinger } from "./webfinger.ts"; +type QueueTaskDispatchResult = + | { readonly outcome: "completed" | "aborted" } + | { + readonly outcome: "failed"; + readonly failureReason: QueueTaskFailureReason; + }; + const circuitBreakerCasWarningKvStores = new WeakSet(); let nextQueueDepthGaugeSourceId = 0; const retryAfterHttpDate = new RegExp( @@ -1240,13 +1247,15 @@ export class FederationImpl ); }; try { - const failureReason = await this.#listenTaskMessage( + const result = await this.#listenTaskMessage( contextData, message, ); recordOutcome( - failureReason == null ? "completed" : "failed", - failureReason, + result.outcome, + result.outcome === "failed" + ? result.failureReason + : undefined, ); } catch (e) { if (isAbortError(e)) recordOutcome("aborted", undefined); @@ -2158,7 +2167,7 @@ export class FederationImpl async #listenTaskMessage( contextData: TContextData, message: TaskMessage, - ): Promise { + ): Promise { const logger = getLogger(["fedify", "federation", "task"]); const def = this.taskDefinitions.get(message.taskName); if (def == null) { @@ -2168,35 +2177,29 @@ export class FederationImpl "dropping.", { taskName: message.taskName }, ); - return "unknown_task"; + return { outcome: "failed", failureReason: "unknown_task" }; } const context = this.#createContext(new URL(message.baseUrl), contextData); - const deserialized = await context.codec.deserialize(message.data) - .then((value) => ({ ok: true, value }) as const) - .catch((error) => ({ ok: false, error }) as const); - if (!deserialized.ok) { - logger.error( - "Custom task {taskName} payload could not be deserialized; " + - "dropping:\n{error}", - { taskName: message.taskName, error: deserialized.error }, - ); - return "deserialization"; - } - const data = await context.codec.validate(def.schema, deserialized.value) - .then((value) => ({ ok: true, value }) as const) - .catch((error) => ({ ok: false, error }) as const); + const data = await context.codec.decode(def.schema, message.data); if (!data.ok) { - // An incompatible payload won't succeed by retrying. - logger.error( - "Custom task {taskName} payload failed schema validation; " + - "dropping:\n{error}", - { taskName: message.taskName, error: data.error }, - ); - return "validation"; + if (data.phase === "deserialization") { + logger.error( + "Custom task {taskName} payload could not be deserialized; " + + "dropping:\n{error}", + { taskName: message.taskName, error: data.error }, + ); + } else { + logger.error( + "Custom task {taskName} payload failed schema validation; " + + "dropping:\n{error}", + { taskName: message.taskName, error: data.error }, + ); + } + return { outcome: "failed", failureReason: data.phase }; } try { await def.handler(context, data.value); - return undefined; + return { outcome: "completed" }; } catch (error) { if (def.onError != null) { try { @@ -2249,6 +2252,7 @@ export class FederationImpl { role: "task", queue, taskName: message.taskName }, retryMessage.attempt, ); + return { outcome: "completed" }; } else { logger.error( "Custom task {taskName} failed after {attempt} attempts; giving " + @@ -2257,7 +2261,9 @@ export class FederationImpl ); } // A swallowed abort is a graceful interruption, not a task failure. - return isAbortError(error) ? undefined : "handler"; + return isAbortError(error) + ? { outcome: "aborted" } + : { outcome: "failed", failureReason: "handler" }; } } diff --git a/packages/fedify/src/federation/tasks/codec.test.ts b/packages/fedify/src/federation/tasks/codec.test.ts index fcc84de0e..d4fae0463 100644 --- a/packages/fedify/src/federation/tasks/codec.test.ts +++ b/packages/fedify/src/federation/tasks/codec.test.ts @@ -328,9 +328,10 @@ test("TaskCodec.encode() / decode()", async (t) => { const wire = await codec.encode(schema, payload); strictEqual(typeof wire, "string"); const back = await codec.decode(schema, wire); - ok(back.note instanceof Note); - strictEqual(back.note.content?.toString(), "Hi"); - strictEqual(back.title, "greeting"); + if (!back.ok) throw back.error; + ok(back.value.note instanceof Note); + strictEqual(back.value.note.content?.toString(), "Hi"); + strictEqual(back.value.title, "greeting"); }, ); @@ -342,18 +343,27 @@ test("TaskCodec.encode() / decode()", async (t) => { }); await t.step( - "decode() re-validates and rejects a drifted payload", + "decode() reports a validation phase for a drifted payload", async () => { // Encode under a permissive schema, decode under the strict one. const loose = makeSchema((_data): _data is unknown => true); const wire = await codec.encode(loose, { note: "not a note" }); - await rejects( - () => codec.decode(schema, wire), - { name: "TypeError", message: /Task data failed schema validation/ }, - ); + const result = await codec.decode(schema, wire); + strictEqual(result.ok, false); + if (result.ok) throw new Error("Expected decode to fail."); + strictEqual(result.phase, "validation"); + ok(result.error instanceof TypeError); }, ); + await t.step("decode() reports a deserialization phase", async () => { + const result = await codec.decode(schema, "not devalue"); + strictEqual(result.ok, false); + if (result.ok) throw new Error("Expected decode to fail."); + strictEqual(result.phase, "deserialization"); + ok(result.error instanceof Error); + }); + await t.step( "a non-idempotent (transforming) schema fails to round-trip", async () => { @@ -370,10 +380,11 @@ test("TaskCodec.encode() / decode()", async (t) => { }, }; const wire = await codec.encode(transforming, "hello"); - await rejects( - () => codec.decode(transforming, wire), - { name: "TypeError", message: /Task data failed schema validation/ }, - ); + const result = await codec.decode(transforming, wire); + strictEqual(result.ok, false); + if (result.ok) throw new Error("Expected decode to fail."); + strictEqual(result.phase, "validation"); + ok(result.error instanceof TypeError); }, ); }); diff --git a/packages/fedify/src/federation/tasks/codec.ts b/packages/fedify/src/federation/tasks/codec.ts index d29d682c5..39ce04fd8 100644 --- a/packages/fedify/src/federation/tasks/codec.ts +++ b/packages/fedify/src/federation/tasks/codec.ts @@ -4,6 +4,14 @@ import type { TracerProvider } from "@opentelemetry/api"; import type { StandardSchemaV1 } from "@standard-schema/spec"; import { parse, stringifyAsync } from "devalue"; +type TaskCodecDecodeResult = + | { readonly ok: true; readonly value: StandardSchemaV1.InferOutput } + | { + readonly ok: false; + readonly phase: "deserialization" | "validation"; + readonly error: unknown; + }; + /** * Serializes and deserializes task payloads for the queue, preserving * `@fedify/vocab` objects across the wire by reducing them to JSON-LD and @@ -30,21 +38,19 @@ export default class TaskCodec { decode = async ( schema: S, raw: string, - ): Promise> => - TaskCodec.validate(schema, await this.deserialize(raw)); - - /** - * Validates an already-deserialized `data` against `schema`. An instance - * wrapper over {@link TaskCodec.validate} so the dispatch site can split - * {@link decode} into its deserialize and validate phases—telling a - * deserialization failure apart from a validation failure—without importing - * the class. - */ - validate = ( - schema: S, - data: unknown, - ): Promise> => - TaskCodec.validate(schema, data); + ): Promise> => { + let data: unknown; + try { + data = await this.deserialize(raw); + } catch (error) { + return { ok: false, phase: "deserialization", error }; + } + try { + return { ok: true, value: await TaskCodec.validate(schema, data) }; + } catch (error) { + return { ok: false, phase: "validation", error }; + } + }; static validate = async ( schema: S, diff --git a/packages/fedify/src/federation/tasks/tasks.test.ts b/packages/fedify/src/federation/tasks/tasks.test.ts index e62bb0945..28fce13d1 100644 --- a/packages/fedify/src/federation/tasks/tasks.test.ts +++ b/packages/fedify/src/federation/tasks/tasks.test.ts @@ -1525,7 +1525,7 @@ test("task observability", async (t) => { "records the retry re-enqueue with role task and a bumped attempt", async () => { const queue = new MockQueue(); - const { federation, recorder } = instrument({ + const { federation, recorder, exporter } = instrument({ ...baseOptions, queue: { task: queue }, }); @@ -1548,6 +1548,19 @@ test("task observability", async (t) => { strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); strictEqual(enqueued[0].attributes["fedify.task.name"], "retry-me"); strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 1); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); }, ); @@ -1590,6 +1603,49 @@ test("task observability", async (t) => { }, ); + await t.step( + "on a non-native queue an aborted handler that gives up is aborted", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts-give-up", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + retryPolicy: () => null, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("aborts-give-up", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + strictEqual( + recorder.getMeasurements("fedify.queue.task.completed").length, + 0, + ); + const durations = recorder.getMeasurements("fedify.queue.task.duration"); + strictEqual(durations.length, 1); + strictEqual( + durations[0].attributes["fedify.queue.task.result"], + "aborted", + ); + }, + ); + await t.step( "on a non-native queue an aborted handler is retried, not failed", async () => { From 515be03fafaee37b0b20792738d38404a42d21e2 Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sat, 4 Jul 2026 10:35:49 +0000 Subject: [PATCH 5/6] Make task enqueue metrics survive partial fan-out The fedify.queue.task.enqueued counter was recorded only after the whole dispatch resolved, so on a queue without enqueueMany a batch fanned out through Promise.all could lose measurements: one rejected enqueue aborted the batch before any recording, leaving messages that had already reached the backend uncounted and skewing the enqueued-versus-completed reconciliation. The fan-out path now uses Promise.allSettled and records each message right after its individual enqueue succeeds, mirroring how the outbox delivery path counts partial successes, and rethrows the first rejection afterwards. The single-message and enqueueMany paths record the whole batch with one counter add carrying the batch size, so recordQueueTaskEnqueued() gains an optional count parameter instead of being called once per identical-attribute message. FederationImpl also gains a metrics getter wrapping the memoized getFederationMetrics() lookup, replacing the scattered per-call-site invocations. https://github.com/fedify-dev/fedify/pull/812#pullrequestreview-4623203224 Assisted-by: Claude Code:claude-fable-5 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/metrics.ts | 3 +- packages/fedify/src/federation/middleware.ts | 53 +++++------ .../src/federation/tasks/enqueue.test.ts | 89 ++++++++++++++++++- .../fedify/src/federation/tasks/enqueue.ts | 52 +++++++---- 4 files changed, 154 insertions(+), 43 deletions(-) diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 358fe18e8..832c37d61 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -1011,10 +1011,11 @@ class FederationMetrics { recordQueueTaskEnqueued( common: QueueTaskCommonAttributes, attempt: number, + count = 1, ): void { const attributes = buildQueueTaskAttributes(common); attributes["fedify.queue.task.attempt"] = attempt; - this.queueTaskEnqueued.add(1, attributes); + this.queueTaskEnqueued.add(count, attributes); } recordQueueTaskStarted(common: QueueTaskCommonAttributes): void { diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 433362eb5..35e998441 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -955,6 +955,10 @@ export class FederationImpl return meterProvider; } + get metrics() { + return getFederationMetrics(this.meterProvider); + } + #registerQueueDepthGauge(meterProvider: MeterProvider): void { if (meterProvider === this.#queueDepthGaugeMeterProvider) return; registerQueueDepthGauge(meterProvider, this.#queueDepthGaugeEntries, { @@ -1031,7 +1035,6 @@ export class FederationImpl context.active(), message.traceContext, ); - const meter = getFederationMetrics(this.meterProvider); return withContext({ messageId: message.id }, async () => { if (message.type === "fanout") { const common: QueueTaskCommonAttributes = { @@ -1059,8 +1062,8 @@ export class FederationImpl message.activityId, ); } - meter.recordQueueTaskStarted(common); - meter.incrementQueueTaskInFlight(common); + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); const startedAt = performance.now(); let outcome: QueueTaskResult = "completed"; try { @@ -1076,12 +1079,12 @@ export class FederationImpl } throw e; } finally { - meter.recordQueueTaskOutcome( + this.metrics.recordQueueTaskOutcome( common, outcome, getDurationMs(startedAt), ); - meter.decrementQueueTaskInFlight(common); + this.metrics.decrementQueueTaskInFlight(common); span.end(); } }, @@ -1115,8 +1118,8 @@ export class FederationImpl message.activityId, ); } - meter.recordQueueTaskStarted(common); - meter.incrementQueueTaskInFlight(common); + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); const startedAt = performance.now(); let outcome: QueueTaskResult = "completed"; try { @@ -1132,12 +1135,12 @@ export class FederationImpl } throw e; } finally { - meter.recordQueueTaskOutcome( + this.metrics.recordQueueTaskOutcome( common, outcome, getDurationMs(startedAt), ); - meter.decrementQueueTaskInFlight(common); + this.metrics.decrementQueueTaskInFlight(common); span.end(); } }, @@ -1163,8 +1166,8 @@ export class FederationImpl return await withContext( { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, async () => { - meter.recordQueueTaskStarted(common); - meter.incrementQueueTaskInFlight(common); + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); const startedAt = performance.now(); let outcome: QueueTaskResult = "completed"; try { @@ -1187,12 +1190,12 @@ export class FederationImpl } throw e; } finally { - meter.recordQueueTaskOutcome( + this.metrics.recordQueueTaskOutcome( common, outcome, getDurationMs(startedAt), ); - meter.decrementQueueTaskInFlight(common); + this.metrics.decrementQueueTaskInFlight(common); span.end(); } }, @@ -1221,8 +1224,8 @@ export class FederationImpl return await withContext( { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, async () => { - meter.recordQueueTaskStarted(common); - meter.incrementQueueTaskInFlight(common); + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); const startedAt = performance.now(); const recordOutcome = ( outcome: QueueTaskResult, @@ -1239,7 +1242,7 @@ export class FederationImpl ...(error == null ? {} : { message: String(error) }), }); } - meter.recordQueueTaskOutcome( + this.metrics.recordQueueTaskOutcome( common, outcome, getDurationMs(startedAt), @@ -1262,7 +1265,7 @@ export class FederationImpl else recordOutcome("failed", "handler", e); throw e; } finally { - meter.decrementQueueTaskInFlight(common); + this.metrics.decrementQueueTaskInFlight(common); span.end(); } }, @@ -1388,7 +1391,7 @@ export class FederationImpl delay: clampNegativeDelay(delay), orderingKey: message.orderingKey, }); - getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + this.metrics.recordQueueTaskEnqueued( { role: "outbox", queue: outboxQueue, @@ -1667,7 +1670,7 @@ export class FederationImpl if ( isPermanentFailure ) { - getFederationMetrics(this.meterProvider).recordPermanentFailure( + this.metrics.recordPermanentFailure( error.inbox, error.statusCode, ); @@ -1767,7 +1770,7 @@ export class FederationImpl orderingKey: message.orderingKey, }, ); - getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + this.metrics.recordQueueTaskEnqueued( { role: "outbox", queue: outboxQueue, @@ -1911,7 +1914,7 @@ export class FederationImpl }, ); if (activityType != null) { - getFederationMetrics(this.meterProvider) + this.metrics .recordQueueTaskEnqueued( { role: "inbox", @@ -2135,7 +2138,7 @@ export class FederationImpl activity, ); } finally { - getFederationMetrics(this.meterProvider) + this.metrics .recordInboxProcessingDuration( activityType, getDurationMs(started), @@ -2248,7 +2251,7 @@ export class FederationImpl delay: clampNegativeDelay(delay), orderingKey: message.orderingKey, }); - getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + this.metrics.recordQueueTaskEnqueued( { role: "task", queue, taskName: message.taskName }, retryMessage.attempt, ); @@ -2636,7 +2639,7 @@ export class FederationImpl response.headers.set("Vary", "Accept"); } } catch (error) { - getFederationMetrics(this.meterProvider) + this.metrics .recordHttpServerRequest( request.method, metricState.endpoint ?? "error", @@ -2655,7 +2658,7 @@ export class FederationImpl ); throw error; } - getFederationMetrics(this.meterProvider).recordHttpServerRequest( + this.metrics.recordHttpServerRequest( request.method, metricState.endpoint ?? "error", getDurationMs(metricStart), diff --git a/packages/fedify/src/federation/tasks/enqueue.test.ts b/packages/fedify/src/federation/tasks/enqueue.test.ts index 83bfdc70a..4ec9112da 100644 --- a/packages/fedify/src/federation/tasks/enqueue.test.ts +++ b/packages/fedify/src/federation/tasks/enqueue.test.ts @@ -1,4 +1,4 @@ -import { test } from "@fedify/fixture"; +import { createTestMeterProvider, test } from "@fedify/fixture"; import { configure, type LogRecord, reset } from "@logtape/logtape"; import { delay } from "es-toolkit"; import { deepStrictEqual, ok, rejects, strictEqual } from "node:assert/strict"; @@ -284,6 +284,36 @@ test("enqueueTasks() validation and dispatch", async (t) => { }, ); + await t.step( + "enqueueTaskMany() records one enqueue metric for an enqueueMany batch", + async () => { + const [meterProvider, recorder] = createTestMeterProvider(); + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + meterProvider, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk-metric", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a", "b", "c"]); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].value, 3); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "bulk-metric"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 0); + }, + ); + await t.step( "enqueueTaskMany() falls back to parallel enqueues", async () => { @@ -305,6 +335,63 @@ test("enqueueTasks() validation and dispatch", async (t) => { }, ); + await t.step( + "enqueueTaskMany() records fan-out successes before a partial failure", + async () => { + class PartiallyFailingQueue implements MessageQueue { + readonly enqueued: TaskMessage[] = []; + #calls = 0; + + enqueue(message: TaskMessage): Promise { + this.#calls++; + if (this.#calls === 2) { + return Promise.reject(new Error("second enqueue failed")); + } + this.enqueued.push(message); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const [meterProvider, recorder] = createTestMeterProvider(); + const queue = new PartiallyFailingQueue(); + const federation = createFederation({ + ...baseOptions, + meterProvider, + queue: { task: queue }, + }); + const task = federation.defineTask("partial-fanout", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => ctx.enqueueTaskMany(task, ["a", "b", "c"]), + { message: /second enqueue failed/ }, + ); + + strictEqual(queue.enqueued.length, 2); + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 2); + for (const measurement of enqueued) { + strictEqual(measurement.value, 1); + strictEqual(measurement.attributes["fedify.queue.role"], "task"); + strictEqual( + measurement.attributes["fedify.task.name"], + "partial-fanout", + ); + strictEqual(measurement.attributes["fedify.queue.task.attempt"], 0); + } + }, + ); + await t.step( "enqueueTaskMany() with no payloads touches no queue", async () => { diff --git a/packages/fedify/src/federation/tasks/enqueue.ts b/packages/fedify/src/federation/tasks/enqueue.ts index acdd9dae0..5531f2e2f 100644 --- a/packages/fedify/src/federation/tasks/enqueue.ts +++ b/packages/fedify/src/federation/tasks/enqueue.ts @@ -7,7 +7,7 @@ * @module */ import { getLogger } from "@logtape/logtape"; -import { context, propagation } from "@opentelemetry/api"; +import { context, type MeterProvider, propagation } from "@opentelemetry/api"; import type { KvKey } from "../kv.ts"; import { getFederationMetrics } from "../metrics.ts"; import type { FederationImpl } from "../middleware.ts"; @@ -96,20 +96,19 @@ const enqueueTasks = ( ctx.federation._startQueueInternal(ctx.data); } try { - await dispatch(queue, messages, { - delay: getDurationIfDefined(options.delay), - orderingKey: options.orderingKey, - deduplicationKey: claim.forwardedDeduplicationKey, - }); - // Counted only after a genuine dispatch: a dedup skip returns before - // this, and a failed dispatch throws into the rollback below. - const meter = getFederationMetrics(ctx.federation.meterProvider); - for (const message of messages) { - meter.recordQueueTaskEnqueued( - { role: "task", queue, taskName: task.name }, - message.attempt, - ); - } + await dispatch( + queue, + messages, + { + delay: getDurationIfDefined(options.delay), + orderingKey: options.orderingKey, + deduplicationKey: claim.forwardedDeduplicationKey, + }, + { + meterProvider: ctx.federation.meterProvider, + taskName: task.name, + }, + ); } catch (error) { if (claim.rollback != null) { try { @@ -261,7 +260,12 @@ async function dispatch( orderingKey?: string; deduplicationKey?: string; }, + { meterProvider, taskName }: { + meterProvider: MeterProvider | undefined; + taskName: string; + }, ): Promise { + const metrics = getFederationMetrics(meterProvider); if (messages.length === 1) { await queue.enqueue(messages[0], options); } else if (queue.enqueueMany != null) { @@ -271,8 +275,24 @@ async function dispatch( delay: options.delay, orderingKey: options.orderingKey, }; - await Promise.all(messages.map((m) => queue.enqueue(m, fanoutOptions))); + const settled = await Promise.allSettled( + messages.map(async (message) => { + await queue.enqueue(message, fanoutOptions); + metrics.recordQueueTaskEnqueued( + { role: "task", queue, taskName }, + message.attempt, + ); + }), + ); + const rejected = settled.find((result) => result.status === "rejected"); + if (rejected != null) throw rejected.reason; + return; } + metrics.recordQueueTaskEnqueued( + { role: "task", queue, taskName }, + messages[0].attempt, + messages.length, + ); } /** From e013d9600547ef834d8140757440863152a2576e Mon Sep 17 00:00:00 2001 From: ChanHaeng Lee <2chanhaeng@gmail.com> Date: Sat, 4 Jul 2026 10:44:24 +0000 Subject: [PATCH 6/6] Fix docs and versions Version errors are found by `Claude Code:fable-5` https://github.com/fedify-dev/fedify/pull/812#discussion_r3517806147 --- CHANGES.md | 2 +- docs/manual/tasks.md | 2 +- packages/fedify/src/federation/metrics.ts | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c5027eaf1..70254e59e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -45,7 +45,7 @@ Version 2.4.0 `FederationOptions.taskDeduplicationTtl` and `FederationOptions.taskDeduplicationFallback` options. - [[#206], [#797], [#798], [#799] [#803], [#806], [#812] by ChanHaeng Lee] + [[#206], [#797], [#798], [#799], [#803], [#806], [#812] by ChanHaeng Lee] [Standard Schema]: https://standardschema.dev/ [#206]: https://github.com/fedify-dev/fedify/issues/206 diff --git a/docs/manual/tasks.md b/docs/manual/tasks.md index 96c90fabe..df9e9ea9b 100644 --- a/docs/manual/tasks.md +++ b/docs/manual/tasks.md @@ -352,7 +352,7 @@ collapsed onto one message. Observability ------------- -*Task-specific telemetry is available since Fedify 2.3.0.* +*Task-specific telemetry is available since Fedify 2.4.0.* Each task the worker dequeues runs inside a `fedify.task` [OpenTelemetry] span (a *consumer* span, since tasks are not part of ActivityPub it is namespaced diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 832c37d61..696eb597b 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -95,7 +95,7 @@ export interface QueueTaskCommonAttributes { /** * The registered name of a custom background task, emitted as the * `fedify.task.name` attribute. Set only for the `"task"` role. - * @since 2.3.0 + * @since 2.4.0 */ taskName?: string; } @@ -225,7 +225,7 @@ export type HttpSignatureMetricFailureReason = * - `validation`: the deserialized payload failed schema validation. * - `unknown_task`: the task name has no registered handler. * - `handler`: the registered handler threw. - * @since 2.3.0 + * @since 2.4.0 */ export type QueueTaskFailureReason = | "deserialization"