diff --git a/AGENTS.md b/AGENTS.md index 7f94255ee..35c48b786 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -145,11 +145,29 @@ Development workflow the editor/LSP after editing YAML. - **Building Packages**: All packages are built automatically as part of setup. Run `mise run build` to rebuild everything, or - `mise run prepare-each ` to rebuild just one (without the `@fedify/` - prefix). - - **Checking Code**: Run `mise run check` before committing. - - **Running Tests**: Use `mise run test:deno` for Deno tests or - `mise run test` for all environments. + `mise run prepare-each ` to rebuild specific packages (without the + `@fedify/` prefix). + - **Checking Code**: Run `mise run check` before committing, or run + `mise run check-each ` to check specific packages. If any issues from + `check:fmt`, `check:lint` or `check:md`, are found, refers + **Formatting and Linting** section. + - **Formatting and Linting**: Run `mise run fmt` to format all code and docs. + - **Running Tests**: + While testing is certainly important, blindly running every test suite every + time is inefficient. Since Deno executes TS source code directly, it doesn't + waste resources on builds. Therefore, during development, run + `mise run test:deno {TEST_PATH} --filter ` for most tests that + are independent of the runtime. If the test is dependent on a specific + runtime other than Deno, replace `test:deno` with `test:node` or `test:bun`. + Once development is complete, run `mise run test-each ` to test the + modified packages (without the `@fedify/` prefix). + Finally, when ready for deployment, run `mise run test` to execute the + whole codebase-wide tests. + - `mise run test`: Executes all the tests in every runtime. + - `mise run test:`: + Executes all the tests by the runtime. + - `mise run test-each `: Executes tests in packages that include + `pkgs` in every runtime (without the `@fedify/` prefix). For detailed contribution guidelines, see *CONTRIBUTING.md*. diff --git a/CHANGES.md b/CHANGES.md index 2fd7300ce..70254e59e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -45,14 +45,16 @@ Version 2.4.0 `FederationOptions.taskDeduplicationTtl` and `FederationOptions.taskDeduplicationFallback` options. - [[#206], [#797], [#798], [#803], [#806] by ChanHaeng Lee] + [[#206], [#797], [#798], [#799], [#803], [#806], [#812] by ChanHaeng Lee] [Standard Schema]: https://standardschema.dev/ [#206]: https://github.com/fedify-dev/fedify/issues/206 [#797]: https://github.com/fedify-dev/fedify/issues/797 [#798]: https://github.com/fedify-dev/fedify/issues/798 +[#799]: https://github.com/fedify-dev/fedify/issues/799 [#803]: https://github.com/fedify-dev/fedify/pull/803 [#806]: https://github.com/fedify-dev/fedify/pull/806 +[#812]: https://github.com/fedify-dev/fedify/pull/812 Version 2.3.1 diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index a25c1eec8..94526d8b1 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -226,6 +226,7 @@ spans: | `activitypub.fetch_document` | Client | Fetches a remote JSON-LD document. | | `activitypub.send_activity` | Client | Sends the ActivityPub activity. | | `activitypub.verify_key_ownership` | Internal | Verifies actor ownership of a key. | +| `fedify.task` | Consumer | Dequeues a custom background task to process. | | `http_signatures.sign` | Internal | Signs the HTTP request. | | `http_signatures.verify` | Internal | Verifies the HTTP request signature. | | `ld_signatures.sign` | Internal | Makes the Linked Data signature. | @@ -364,7 +365,7 @@ Fedify records the following OpenTelemetry metrics: | `webfinger.handle.duration` | Histogram | `ms` | Measures inbound WebFinger request handling duration. | | `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | | `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | -| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | +| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, fanout, and custom background tasks Fedify enqueued. | | `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | | `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | | `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | @@ -799,11 +800,20 @@ Fedify records the following OpenTelemetry metrics: `fedify.queue.task.enqueued`, `fedify.queue.task.started`, `fedify.queue.task.completed`, `fedify.queue.task.failed`, and `fedify.queue.task.duration` -: `fedify.queue.role` (`inbox`, `outbox`, or `fanout`) is always present. +: `fedify.queue.role` (`inbox`, `outbox`, `fanout`, or `task`) is always + present. `fedify.queue.backend` is the queue implementation's constructor name (for example `RedisMessageQueue`) when available; it is omitted for queues whose constructor is the plain `Object` (for example, - `MessageQueue` instances built from an object literal). + `MessageQueue` instances built from an object literal). For a custom + background task (`role=task`) the backend reflects the queue actually used + after routing, including the `outboxQueue` fallback, and `fedify.task.name` + carries the registered task name—it is omitted for an `unknown_task` drop, + whose name is wire-derived, so task-metric cardinality stays bounded to the + registered names. A failed task outcome + (`fedify.queue.task.result=failed`) additionally carries + `fedify.task.failure_reason`, one of `deserialization`, `validation`, + `unknown_task`, or `handler`. `fedify.queue.native_retrial` reflects the queue backend's `nativeRetrial` flag when set on the queue. `activitypub.activity.type` is recorded whenever Fedify knows the activity type for the queued message; for inbox @@ -814,22 +824,29 @@ Fedify records the following OpenTelemetry metrics: from initial enqueues. `fedify.queue.task.completed`, `fedify.queue.task.failed`, and `fedify.queue.task.duration` carry `fedify.queue.task.result`, which is `completed` when processing returned - without throwing, `failed` when the worker re-threw a non-abort error, and - `aborted` when the worker re-threw an `AbortError` (for example, because a - graceful-shutdown `AbortSignal` interrupted processing). When the queue - backend does not declare `nativeRetrial`, Fedify catches inbox listener and - outbox delivery errors itself; if its retry policy still allows another - attempt, it schedules a retry by re-enqueuing the message and returns from - the worker without re-throwing, so the worker boundary records - `result=completed`. When the retry policy gives up, the worker also - returns normally (`result=completed`) without scheduling a retry. - Outbox-side activity failures remain observable through the - `activitypub.delivery.*` metrics and the `activitypub.delivery.failed` - span event, and any retry attempt (inbox or outbox) appears as a - `fedify.queue.task.enqueued` measurement with a non-zero - `fedify.queue.task.attempt`. Inbox listener errors that the retry policy - abandons are visible through error logs and the inbox span's error status, - but not through a dedicated metric. + without throwing, `failed` when processing did not succeed (for inbox and + outbox, the worker re-threw a non-abort error; for a custom task, either + the payload was dropped—`deserialization`, `validation`, or + `unknown_task`—in which case the message is still acked but the outcome + is recorded as `failed` with a `fedify.task.failure_reason`, or the + handler threw and the retry policy declined another attempt, recorded + with the `handler` reason—a handler error folded into a scheduled retry + records `completed` instead), and `aborted` when the worker re-threw an + `AbortError` (for example, because a graceful-shutdown `AbortSignal` + interrupted processing) or when an aborted custom task attempt was + abandoned without a retry. When the queue backend does not declare + `nativeRetrial`, Fedify catches inbox listener and outbox delivery errors + itself; if its retry policy still allows another attempt, it schedules a + retry by re-enqueuing the message and returns from the worker without + re-throwing, so the worker boundary records `result=completed`. When the + retry policy gives up, the worker also returns normally + (`result=completed`) without scheduling a retry. Outbox-side activity + failures remain observable through the `activitypub.delivery.*` metrics and + the `activitypub.delivery.failed` span event, and any retry attempt (inbox + or outbox) appears as a `fedify.queue.task.enqueued` measurement with a + non-zero `fedify.queue.task.attempt`. Inbox listener errors that the retry + policy abandons are visible through error logs and the inbox span's error + status, but not through a dedicated metric. `fedify.queue.task.in_flight` : `fedify.queue.role` and `fedify.queue.backend` (when available), plus @@ -939,79 +956,82 @@ for ActivityPub as of November 2024. However, Fedify provides a set of semantic [attributes] for ActivityPub. The following table shows the semantic attributes for ActivityPub: -| Attribute | Type | Description | Example | -| -------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | -| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | -| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | -| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | -| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | -| `activitypub.circuit_breaker.previous_state` | string | Previous queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"closed"` | -| `activitypub.circuit_breaker.state` | string | Current queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"open"` | -| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | -| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | -| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | -| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | -| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | -| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | -| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | -| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | -| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | -| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | -| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | -| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | -| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | -| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | -| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | -| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | -| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | -| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | -| `activitypub.remote.host` | string | The host of the remote ActivityPub server, including any non-default port. | `"example.com:8443"` | -| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | -| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | -| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | -| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | -| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | -| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | -| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | -| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | -| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | -| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | -| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | -| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | -| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | -| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `shared` for queue depth rows where one queue backs multiple roles. | `"outbox"` | -| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | -| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | -| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | -| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | -| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | -| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | -| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | -| `http.response.status_code` | int | The HTTP response status code. | `200` | -| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | -| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | -| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | -| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | -| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | -| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | -| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | -| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | -| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | -| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | -| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | -| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | -| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | -| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | -| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | -| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | +| Attribute | Type | Description | Example | +| -------------------------------------------- | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | +| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | +| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | +| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | +| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | +| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | +| `activitypub.circuit_breaker.previous_state` | string | Previous queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"closed"` | +| `activitypub.circuit_breaker.state` | string | Current queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"open"` | +| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | +| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | +| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | +| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | +| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | +| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | +| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | +| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | +| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | +| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | +| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | +| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | +| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | +| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | +| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | +| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | +| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | +| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | +| `activitypub.remote.host` | string | The host of the remote ActivityPub server, including any non-default port. | `"example.com:8443"` | +| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | +| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | +| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | +| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | +| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | +| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | +| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | +| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | +| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | +| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | +| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | +| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | +| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | +| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `task`; `shared` additionally appears on queue depth rows where one queue backs multiple roles. | `"outbox"` | +| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | +| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | +| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | +| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | +| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | +| `fedify.task.name` | string | The name of a custom background task: always on the `fedify.task` span; on the task's `fedify.queue.task.*` run metrics only for a registered task (omitted for an `unknown_task` drop, keeping cardinality bounded). | `"sendDigest"` | +| `fedify.task.attempt` | int | The zero-based attempt number of a custom background task, on the `fedify.task` span. | `0` | +| `fedify.task.failure_reason` | string | Why a custom background task failed: `deserialization`, `validation`, `unknown_task`, or `handler`. Set only on a terminal failure. | `"validation"` | +| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | +| `http.response.status_code` | int | The HTTP response status code. | `200` | +| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | +| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | +| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | +| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | +| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | +| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | +| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | +| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | +| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | +| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | +| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | +| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | +| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | +| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | +| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | +| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | [attributes]: https://opentelemetry.io/docs/specs/otel/common/#attribute [OpenTelemetry Semantic Conventions]: https://opentelemetry.io/docs/specs/semconv/ diff --git a/docs/manual/tasks.md b/docs/manual/tasks.md index 599bed2f7..df9e9ea9b 100644 --- a/docs/manual/tasks.md +++ b/docs/manual/tasks.md @@ -349,11 +349,82 @@ collapsed onto one message. > owns an atomic check. +Observability +------------- + +*Task-specific telemetry is available since Fedify 2.4.0.* + +Each task the worker dequeues runs inside a `fedify.task` [OpenTelemetry] span +(a *consumer* span, since tasks are not part of ActivityPub it is namespaced +under `fedify.` rather than `activitypub.`). The span inherits the trace +context captured at the enqueue site, so a task's processing chains to the +request or job that enqueued it—and every retry attempt chains to the same +parent. The span carries: + + - `fedify.task.name` — the registered task name. + - `fedify.task.attempt` — the zero-based attempt number; a retry re-enqueue + increments it. + - `fedify.task.failure_reason` — set only on a terminal failure, one of the + four bounded values below. + +On a terminal failure the span's status is also set to `ERROR`, so trace-based +error views surface dropped and given-up tasks together with their +`fedify.task.failure_reason`. A worker shutdown is the one exception: an +`aborted` attempt leaves the status unset, since an interruption is not a task +failure. + +Tasks also reuse the `fedify.queue.task.*` metric family (`enqueued`, +`started`, `completed`, `failed`, `duration`, `in_flight`) that the inbox, +outbox, and fanout workers already report. On a task run measurement +(`enqueued`, `started`, `completed`, `failed`, `duration`), +`fedify.queue.role` is `task` and `fedify.task.name` names the task; the +process-local `in_flight` UpDownCounter omits `fedify.task.name` so its +increments and decrements pair up cleanly. +`fedify.queue.backend` reflects the queue actually used after routing—so a task +that falls back to the `outboxQueue` (see +[Routing](#queue-routing-and-isolation)) is labeled with the outbox queue's +backend, not a task queue's. A failed outcome +also carries `fedify.task.failure_reason` on `fedify.queue.task.failed` and +`fedify.queue.task.duration`. + +The `fedify.task.failure_reason` attribute takes one of four bounded values, +mapping to the worker's dispatch decision points: + +| Value | Meaning | +| ----------------- | -------------------------------------------------- | +| `deserialization` | The wire payload could not be deserialized. | +| `validation` | The deserialized payload failed schema validation. | +| `unknown_task` | The task name has no registered handler. | +| `handler` | The registered handler threw. | + +The first three are *drops*: the payload cannot succeed by retrying, so the +worker acknowledges the message and does not re-enqueue it. Telemetry still +records these as a failed outcome with the matching reason, while the queue is +left drained—so a drop is observable without being retried. A `handler` +failure follows the configured retry policy (see +[Retries](#retry-and-error-handling)): an attempt folded into a scheduled +retry records a `completed` outcome, and only the terminal give-up records +`failed` with the `handler` reason. A worker shutdown is never counted as a +failure: an interrupted attempt carries no `fedify.task.failure_reason`—it is +recorded as an `aborted` outcome when the abort propagates (on a +`nativeRetrial` queue) or when the retry policy declines another attempt, and +otherwise folded into a scheduled retry like any handler error. + +The bounded value set keeps metric cardinality finite: a metric's task name is +a registered, known-at-startup value, never derived from message content—an +`unknown_task` drop carries a wire-supplied name, so that name is kept off the +metrics (it still appears on the span, which does not aggregate into time +series). See the [OpenTelemetry](./opentelemetry.md) manual for the full span, +attribute, and metric reference. + +[OpenTelemetry]: https://opentelemetry.io/ + + Limitations ----------- -The current API intentionally ships without task-specific OpenTelemetry spans -and metrics, cron-style periodic scheduling, result backends, and per-task -priority. Some of these are planned as follow-ups; see the [tracking issue]. +The current API intentionally ships without cron-style periodic scheduling, +result backends, and per-task priority. Some of these are planned as +follow-ups; see the [tracking issue]. [tracking issue]: https://github.com/fedify-dev/fedify/issues/206 diff --git a/mise.toml b/mise.toml index 7ae57df3b..1fbf82a53 100644 --- a/mise.toml +++ b/mise.toml @@ -246,9 +246,9 @@ for pkg in ($env.usage_packages | split row " " | where ($it | is-not-empty)) { ''' # Testing + [tasks."test:deno"] description = "Run the test suite using Deno" -depends = ["build"] run = "deno test --check --doc --allow-all --unstable-kv --trace-leaks --parallel" [tasks."test:node"] diff --git a/packages/cli/src/startup.test.ts b/packages/cli/src/startup.test.ts index e73f5098f..f13e1170d 100644 --- a/packages/cli/src/startup.test.ts +++ b/packages/cli/src/startup.test.ts @@ -5,7 +5,10 @@ import test from "node:test"; import { fileURLToPath } from "node:url"; const packageDir = resolve(dirname(fileURLToPath(import.meta.url)), ".."); -test("CLI build keeps the init command bridge", async () => { + +test("CLI build keeps the init command bridge", { + skip: "Deno" in globalThis, +}, async () => { const entrypoint = resolve(packageDir, "dist/mod.js"); const commandBridge = resolve(packageDir, "dist/commands.js"); await access(entrypoint); diff --git a/packages/create/src/package.test.ts b/packages/create/src/package.test.ts index ba50b3660..4a753d467 100644 --- a/packages/create/src/package.test.ts +++ b/packages/create/src/package.test.ts @@ -8,6 +8,7 @@ const packageDir = resolve(dirname(fileURLToPath(import.meta.url)), ".."); test( "package.json entrypoints match built create CLI", + { skip: "Deno" in globalThis }, async () => { const packageJson = JSON.parse( await readFile(resolve(packageDir, "package.json"), "utf8"), diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 2c53ca86d..696eb597b 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -16,7 +16,7 @@ import type { MessageQueue } from "./mq.ts"; * The role of a queued task, derived from the queued message's `type` field. * @since 2.3.0 */ -export type QueueTaskRole = "fanout" | "outbox" | "inbox"; +export type QueueTaskRole = "fanout" | "outbox" | "inbox" | "task"; /** * The terminal result of a queued task processing attempt. @@ -91,6 +91,13 @@ export interface QueueTaskCommonAttributes { role: QueueTaskRole; queue?: MessageQueue; activityType?: string; + + /** + * The registered name of a custom background task, emitted as the + * `fedify.task.name` attribute. Set only for the `"task"` role. + * @since 2.4.0 + */ + taskName?: string; } /** @@ -209,6 +216,23 @@ export type HttpSignatureMetricFailureReason = | "invalidSignature" | "keyFetchError"; +/** + * The reason a custom background task terminated unsuccessfully, emitted as the + * `fedify.task.failure_reason` attribute. A small bounded set mapping to the + * worker's dispatch decision points; open to later refinement. + * + * - `deserialization`: the wire payload could not be deserialized. + * - `validation`: the deserialized payload failed schema validation. + * - `unknown_task`: the task name has no registered handler. + * - `handler`: the registered handler threw. + * @since 2.4.0 + */ +export type QueueTaskFailureReason = + | "deserialization" + | "validation" + | "unknown_task" + | "handler"; + /** * Bounded values recorded as `ld_signatures.type` on the signature * verification duration histogram. Fedify only signs and verifies @@ -987,10 +1011,11 @@ class FederationMetrics { recordQueueTaskEnqueued( common: QueueTaskCommonAttributes, attempt: number, + count = 1, ): void { const attributes = buildQueueTaskAttributes(common); attributes["fedify.queue.task.attempt"] = attempt; - this.queueTaskEnqueued.add(1, attributes); + this.queueTaskEnqueued.add(count, attributes); } recordQueueTaskStarted(common: QueueTaskCommonAttributes): void { @@ -1009,9 +1034,13 @@ class FederationMetrics { common: QueueTaskCommonAttributes, result: QueueTaskResult, durationMs: number, + failureReason?: QueueTaskFailureReason, ): void { const attributes = buildQueueTaskAttributes(common); attributes["fedify.queue.task.result"] = result; + if (failureReason != null && result === "failed") { + attributes["fedify.task.failure_reason"] = failureReason; + } if (result === "completed") { this.queueTaskCompleted.add(1, attributes); } else if (result === "failed") { @@ -1197,6 +1226,9 @@ function buildQueueTaskAttributes( if (common.activityType != null) { attributes["activitypub.activity.type"] = common.activityType; } + if (common.taskName != null) { + attributes["fedify.task.name"] = common.taskName; + } return attributes; } diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 6ff54ca91..d1e3c3882 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -10506,8 +10506,11 @@ test("createFederation() omits instrumentation when no meterProvider is set", () }); const taskCodec = new TaskCodec({ contextLoader: mockDocumentLoader }); -const decodeEnvelope = (message: TaskMessage): Promise => - taskCodec.decode(envelopeSchema, message.data); +const decodeEnvelope = async (message: TaskMessage): Promise => { + const decoded = await taskCodec.decode(envelopeSchema, message.data); + if (!decoded.ok) throw decoded.error; + return decoded.value; +}; const envelope = (title: string): Envelope => ({ note: new Note({ content: title }), title, diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 0f93c900c..35e998441 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -141,6 +141,7 @@ import { isAbortError, type QueueDepthGaugeEntry, type QueueTaskCommonAttributes, + type QueueTaskFailureReason, type QueueTaskResult, recordCircuitBreakerStateChange, recordCollectionRequest, @@ -176,6 +177,13 @@ import { import { hasMalformedKnownTemporalLiteral } from "./temporal.ts"; import { handleWebFinger } from "./webfinger.ts"; +type QueueTaskDispatchResult = + | { readonly outcome: "completed" | "aborted" } + | { + readonly outcome: "failed"; + readonly failureReason: QueueTaskFailureReason; + }; + const circuitBreakerCasWarningKvStores = new WeakSet(); let nextQueueDepthGaugeSourceId = 0; const retryAfterHttpDate = new RegExp( @@ -947,6 +955,10 @@ export class FederationImpl return meterProvider; } + get metrics() { + return getFederationMetrics(this.meterProvider); + } + #registerQueueDepthGauge(meterProvider: MeterProvider): void { if (meterProvider === this.#queueDepthGaugeMeterProvider) return; registerQueueDepthGauge(meterProvider, this.#queueDepthGaugeEntries, { @@ -1023,7 +1035,6 @@ export class FederationImpl context.active(), message.traceContext, ); - const meter = getFederationMetrics(this.meterProvider); return withContext({ messageId: message.id }, async () => { if (message.type === "fanout") { const common: QueueTaskCommonAttributes = { @@ -1051,8 +1062,8 @@ export class FederationImpl message.activityId, ); } - meter.recordQueueTaskStarted(common); - meter.incrementQueueTaskInFlight(common); + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); const startedAt = performance.now(); let outcome: QueueTaskResult = "completed"; try { @@ -1068,12 +1079,12 @@ export class FederationImpl } throw e; } finally { - meter.recordQueueTaskOutcome( + this.metrics.recordQueueTaskOutcome( common, outcome, getDurationMs(startedAt), ); - meter.decrementQueueTaskInFlight(common); + this.metrics.decrementQueueTaskInFlight(common); span.end(); } }, @@ -1107,8 +1118,8 @@ export class FederationImpl message.activityId, ); } - meter.recordQueueTaskStarted(common); - meter.incrementQueueTaskInFlight(common); + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); const startedAt = performance.now(); let outcome: QueueTaskResult = "completed"; try { @@ -1124,12 +1135,12 @@ export class FederationImpl } throw e; } finally { - meter.recordQueueTaskOutcome( + this.metrics.recordQueueTaskOutcome( common, outcome, getDurationMs(startedAt), ); - meter.decrementQueueTaskInFlight(common); + this.metrics.decrementQueueTaskInFlight(common); span.end(); } }, @@ -1155,8 +1166,8 @@ export class FederationImpl return await withContext( { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, async () => { - meter.recordQueueTaskStarted(common); - meter.incrementQueueTaskInFlight(common); + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); const startedAt = performance.now(); let outcome: QueueTaskResult = "completed"; try { @@ -1179,12 +1190,12 @@ export class FederationImpl } throw e; } finally { - meter.recordQueueTaskOutcome( + this.metrics.recordQueueTaskOutcome( common, outcome, getDurationMs(startedAt), ); - meter.decrementQueueTaskInFlight(common); + this.metrics.decrementQueueTaskInFlight(common); span.end(); } }, @@ -1192,7 +1203,75 @@ export class FederationImpl }, ); } else if (message.type === "task") { - await this.#listenTaskMessage(contextData, message); + const registered = this.taskDefinitions.get(message.taskName) != null; + const common: QueueTaskCommonAttributes = { + role: "task", + queue: this.resolveTaskQueue(message.taskName), + taskName: registered ? message.taskName : undefined, + }; + await tracer.startActiveSpan( + "fedify.task", + { + kind: SpanKind.CONSUMER, + attributes: { + "fedify.task.name": message.taskName, + "fedify.task.attempt": message.attempt, + }, + }, + extractedContext, + async (span) => { + const spanCtx = span.spanContext(); + return await withContext( + { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, + async () => { + this.metrics.recordQueueTaskStarted(common); + this.metrics.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + const recordOutcome = ( + outcome: QueueTaskResult, + failureReason: QueueTaskFailureReason | undefined, + error?: unknown, + ): void => { + if (failureReason != null) { + span.setAttribute( + "fedify.task.failure_reason", + failureReason, + ); + span.setStatus({ + code: SpanStatusCode.ERROR, + ...(error == null ? {} : { message: String(error) }), + }); + } + this.metrics.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + outcome === "failed" ? failureReason : undefined, + ); + }; + try { + const result = await this.#listenTaskMessage( + contextData, + message, + ); + recordOutcome( + result.outcome, + result.outcome === "failed" + ? result.failureReason + : undefined, + ); + } catch (e) { + if (isAbortError(e)) recordOutcome("aborted", undefined); + else recordOutcome("failed", "handler", e); + throw e; + } finally { + this.metrics.decrementQueueTaskInFlight(common); + span.end(); + } + }, + ); + }, + ); } }); } @@ -1312,7 +1391,7 @@ export class FederationImpl delay: clampNegativeDelay(delay), orderingKey: message.orderingKey, }); - getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + this.metrics.recordQueueTaskEnqueued( { role: "outbox", queue: outboxQueue, @@ -1591,7 +1670,7 @@ export class FederationImpl if ( isPermanentFailure ) { - getFederationMetrics(this.meterProvider).recordPermanentFailure( + this.metrics.recordPermanentFailure( error.inbox, error.statusCode, ); @@ -1691,7 +1770,7 @@ export class FederationImpl orderingKey: message.orderingKey, }, ); - getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + this.metrics.recordQueueTaskEnqueued( { role: "outbox", queue: outboxQueue, @@ -1835,7 +1914,7 @@ export class FederationImpl }, ); if (activityType != null) { - getFederationMetrics(this.meterProvider) + this.metrics .recordQueueTaskEnqueued( { role: "inbox", @@ -2059,7 +2138,7 @@ export class FederationImpl activity, ); } finally { - getFederationMetrics(this.meterProvider) + this.metrics .recordInboxProcessingDuration( activityType, getDurationMs(started), @@ -2091,7 +2170,7 @@ export class FederationImpl async #listenTaskMessage( contextData: TContextData, message: TaskMessage, - ): Promise { + ): Promise { const logger = getLogger(["fedify", "federation", "task"]); const def = this.taskDefinitions.get(message.taskName); if (def == null) { @@ -2101,30 +2180,33 @@ export class FederationImpl "dropping.", { taskName: message.taskName }, ); - return; + return { outcome: "failed", failureReason: "unknown_task" }; } const context = this.#createContext(new URL(message.baseUrl), contextData); - let data: unknown; - try { - // decode() deserializes then re-validates at the dequeue boundary - // (drift protection): a durable queue can hand a new deploy a payload - // an old deploy enqueued. - data = await context.codec.decode(def.schema, message.data); - } catch (error) { - // A malformed or incompatible payload won't succeed by retrying. - logger.error( - "Custom task {taskName} payload could not be decoded or validated; " + - "dropping:\n{error}", - { taskName: message.taskName, error }, - ); - return; + const data = await context.codec.decode(def.schema, message.data); + if (!data.ok) { + if (data.phase === "deserialization") { + logger.error( + "Custom task {taskName} payload could not be deserialized; " + + "dropping:\n{error}", + { taskName: message.taskName, error: data.error }, + ); + } else { + logger.error( + "Custom task {taskName} payload failed schema validation; " + + "dropping:\n{error}", + { taskName: message.taskName, error: data.error }, + ); + } + return { outcome: "failed", failureReason: data.phase }; } try { - await def.handler(context, data); + await def.handler(context, data.value); + return { outcome: "completed" }; } catch (error) { if (def.onError != null) { try { - await def.onError(context, error, data); + await def.onError(context, error, data.value); } catch (onErrorError) { logger.error( "onError for custom task {taskName} threw:\n{error}", @@ -2169,6 +2251,11 @@ export class FederationImpl delay: clampNegativeDelay(delay), orderingKey: message.orderingKey, }); + this.metrics.recordQueueTaskEnqueued( + { role: "task", queue, taskName: message.taskName }, + retryMessage.attempt, + ); + return { outcome: "completed" }; } else { logger.error( "Custom task {taskName} failed after {attempt} attempts; giving " + @@ -2176,6 +2263,10 @@ export class FederationImpl { taskName: message.taskName, attempt: message.attempt, error }, ); } + // A swallowed abort is a graceful interruption, not a task failure. + return isAbortError(error) + ? { outcome: "aborted" } + : { outcome: "failed", failureReason: "handler" }; } } @@ -2548,7 +2639,7 @@ export class FederationImpl response.headers.set("Vary", "Accept"); } } catch (error) { - getFederationMetrics(this.meterProvider) + this.metrics .recordHttpServerRequest( request.method, metricState.endpoint ?? "error", @@ -2567,7 +2658,7 @@ export class FederationImpl ); throw error; } - getFederationMetrics(this.meterProvider).recordHttpServerRequest( + this.metrics.recordHttpServerRequest( request.method, metricState.endpoint ?? "error", getDurationMs(metricStart), diff --git a/packages/fedify/src/federation/tasks/codec.test.ts b/packages/fedify/src/federation/tasks/codec.test.ts index fcc84de0e..d4fae0463 100644 --- a/packages/fedify/src/federation/tasks/codec.test.ts +++ b/packages/fedify/src/federation/tasks/codec.test.ts @@ -328,9 +328,10 @@ test("TaskCodec.encode() / decode()", async (t) => { const wire = await codec.encode(schema, payload); strictEqual(typeof wire, "string"); const back = await codec.decode(schema, wire); - ok(back.note instanceof Note); - strictEqual(back.note.content?.toString(), "Hi"); - strictEqual(back.title, "greeting"); + if (!back.ok) throw back.error; + ok(back.value.note instanceof Note); + strictEqual(back.value.note.content?.toString(), "Hi"); + strictEqual(back.value.title, "greeting"); }, ); @@ -342,18 +343,27 @@ test("TaskCodec.encode() / decode()", async (t) => { }); await t.step( - "decode() re-validates and rejects a drifted payload", + "decode() reports a validation phase for a drifted payload", async () => { // Encode under a permissive schema, decode under the strict one. const loose = makeSchema((_data): _data is unknown => true); const wire = await codec.encode(loose, { note: "not a note" }); - await rejects( - () => codec.decode(schema, wire), - { name: "TypeError", message: /Task data failed schema validation/ }, - ); + const result = await codec.decode(schema, wire); + strictEqual(result.ok, false); + if (result.ok) throw new Error("Expected decode to fail."); + strictEqual(result.phase, "validation"); + ok(result.error instanceof TypeError); }, ); + await t.step("decode() reports a deserialization phase", async () => { + const result = await codec.decode(schema, "not devalue"); + strictEqual(result.ok, false); + if (result.ok) throw new Error("Expected decode to fail."); + strictEqual(result.phase, "deserialization"); + ok(result.error instanceof Error); + }); + await t.step( "a non-idempotent (transforming) schema fails to round-trip", async () => { @@ -370,10 +380,11 @@ test("TaskCodec.encode() / decode()", async (t) => { }, }; const wire = await codec.encode(transforming, "hello"); - await rejects( - () => codec.decode(transforming, wire), - { name: "TypeError", message: /Task data failed schema validation/ }, - ); + const result = await codec.decode(transforming, wire); + strictEqual(result.ok, false); + if (result.ok) throw new Error("Expected decode to fail."); + strictEqual(result.phase, "validation"); + ok(result.error instanceof TypeError); }, ); }); diff --git a/packages/fedify/src/federation/tasks/codec.ts b/packages/fedify/src/federation/tasks/codec.ts index e649ce061..39ce04fd8 100644 --- a/packages/fedify/src/federation/tasks/codec.ts +++ b/packages/fedify/src/federation/tasks/codec.ts @@ -4,6 +4,14 @@ import type { TracerProvider } from "@opentelemetry/api"; import type { StandardSchemaV1 } from "@standard-schema/spec"; import { parse, stringifyAsync } from "devalue"; +type TaskCodecDecodeResult = + | { readonly ok: true; readonly value: StandardSchemaV1.InferOutput } + | { + readonly ok: false; + readonly phase: "deserialization" | "validation"; + readonly error: unknown; + }; + /** * Serializes and deserializes task payloads for the queue, preserving * `@fedify/vocab` objects across the wire by reducing them to JSON-LD and @@ -17,9 +25,8 @@ export default class TaskCodec { serialize = (data: unknown): Promise => stringifyAsync(data, { Vocab: this.#stringifyVocab }); - /** Deserializes `raw`, rebuilding any encoded vocabulary object. */ - deserialize = (raw: string): Promise => - this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); + deserialize = async (raw: string): Promise => + await this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); /** Validates `data` against `schema`, then serializes it. */ encode = async ( @@ -31,10 +38,20 @@ export default class TaskCodec { decode = async ( schema: S, raw: string, - ): Promise> => - TaskCodec.validate(schema, await this.deserialize(raw)); + ): Promise> => { + let data: unknown; + try { + data = await this.deserialize(raw); + } catch (error) { + return { ok: false, phase: "deserialization", error }; + } + try { + return { ok: true, value: await TaskCodec.validate(schema, data) }; + } catch (error) { + return { ok: false, phase: "validation", error }; + } + }; - /** Validates `data` against `schema`, returning its parsed output. */ static validate = async ( schema: S, data: unknown, diff --git a/packages/fedify/src/federation/tasks/enqueue.test.ts b/packages/fedify/src/federation/tasks/enqueue.test.ts index 83bfdc70a..4ec9112da 100644 --- a/packages/fedify/src/federation/tasks/enqueue.test.ts +++ b/packages/fedify/src/federation/tasks/enqueue.test.ts @@ -1,4 +1,4 @@ -import { test } from "@fedify/fixture"; +import { createTestMeterProvider, test } from "@fedify/fixture"; import { configure, type LogRecord, reset } from "@logtape/logtape"; import { delay } from "es-toolkit"; import { deepStrictEqual, ok, rejects, strictEqual } from "node:assert/strict"; @@ -284,6 +284,36 @@ test("enqueueTasks() validation and dispatch", async (t) => { }, ); + await t.step( + "enqueueTaskMany() records one enqueue metric for an enqueueMany batch", + async () => { + const [meterProvider, recorder] = createTestMeterProvider(); + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + meterProvider, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk-metric", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a", "b", "c"]); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].value, 3); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "bulk-metric"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 0); + }, + ); + await t.step( "enqueueTaskMany() falls back to parallel enqueues", async () => { @@ -305,6 +335,63 @@ test("enqueueTasks() validation and dispatch", async (t) => { }, ); + await t.step( + "enqueueTaskMany() records fan-out successes before a partial failure", + async () => { + class PartiallyFailingQueue implements MessageQueue { + readonly enqueued: TaskMessage[] = []; + #calls = 0; + + enqueue(message: TaskMessage): Promise { + this.#calls++; + if (this.#calls === 2) { + return Promise.reject(new Error("second enqueue failed")); + } + this.enqueued.push(message); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const [meterProvider, recorder] = createTestMeterProvider(); + const queue = new PartiallyFailingQueue(); + const federation = createFederation({ + ...baseOptions, + meterProvider, + queue: { task: queue }, + }); + const task = federation.defineTask("partial-fanout", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => ctx.enqueueTaskMany(task, ["a", "b", "c"]), + { message: /second enqueue failed/ }, + ); + + strictEqual(queue.enqueued.length, 2); + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 2); + for (const measurement of enqueued) { + strictEqual(measurement.value, 1); + strictEqual(measurement.attributes["fedify.queue.role"], "task"); + strictEqual( + measurement.attributes["fedify.task.name"], + "partial-fanout", + ); + strictEqual(measurement.attributes["fedify.queue.task.attempt"], 0); + } + }, + ); + await t.step( "enqueueTaskMany() with no payloads touches no queue", async () => { diff --git a/packages/fedify/src/federation/tasks/enqueue.ts b/packages/fedify/src/federation/tasks/enqueue.ts index a4009aa1a..5531f2e2f 100644 --- a/packages/fedify/src/federation/tasks/enqueue.ts +++ b/packages/fedify/src/federation/tasks/enqueue.ts @@ -7,8 +7,9 @@ * @module */ import { getLogger } from "@logtape/logtape"; -import { context, propagation } from "@opentelemetry/api"; +import { context, type MeterProvider, propagation } from "@opentelemetry/api"; import type { KvKey } from "../kv.ts"; +import { getFederationMetrics } from "../metrics.ts"; import type { FederationImpl } from "../middleware.ts"; import { type MessageQueue, ParallelMessageQueue } from "../mq.ts"; import type { TaskMessage } from "../queue.ts"; @@ -95,11 +96,19 @@ const enqueueTasks = ( ctx.federation._startQueueInternal(ctx.data); } try { - await dispatch(queue, messages, { - delay: getDurationIfDefined(options.delay), - orderingKey: options.orderingKey, - deduplicationKey: claim.forwardedDeduplicationKey, - }); + await dispatch( + queue, + messages, + { + delay: getDurationIfDefined(options.delay), + orderingKey: options.orderingKey, + deduplicationKey: claim.forwardedDeduplicationKey, + }, + { + meterProvider: ctx.federation.meterProvider, + taskName: task.name, + }, + ); } catch (error) { if (claim.rollback != null) { try { @@ -251,7 +260,12 @@ async function dispatch( orderingKey?: string; deduplicationKey?: string; }, + { meterProvider, taskName }: { + meterProvider: MeterProvider | undefined; + taskName: string; + }, ): Promise { + const metrics = getFederationMetrics(meterProvider); if (messages.length === 1) { await queue.enqueue(messages[0], options); } else if (queue.enqueueMany != null) { @@ -261,8 +275,24 @@ async function dispatch( delay: options.delay, orderingKey: options.orderingKey, }; - await Promise.all(messages.map((m) => queue.enqueue(m, fanoutOptions))); + const settled = await Promise.allSettled( + messages.map(async (message) => { + await queue.enqueue(message, fanoutOptions); + metrics.recordQueueTaskEnqueued( + { role: "task", queue, taskName }, + message.attempt, + ); + }), + ); + const rejected = settled.find((result) => result.status === "rejected"); + if (rejected != null) throw rejected.reason; + return; } + metrics.recordQueueTaskEnqueued( + { role: "task", queue, taskName }, + messages[0].attempt, + messages.length, + ); } /** diff --git a/packages/fedify/src/federation/tasks/tasks.test.ts b/packages/fedify/src/federation/tasks/tasks.test.ts index c42c7bb94..28fce13d1 100644 --- a/packages/fedify/src/federation/tasks/tasks.test.ts +++ b/packages/fedify/src/federation/tasks/tasks.test.ts @@ -1,5 +1,12 @@ -import { mockDocumentLoader, test } from "@fedify/fixture"; +import { + createTestMeterProvider, + createTestTracerProvider, + mockDocumentLoader, + test, +} from "@fedify/fixture"; import { Note } from "@fedify/vocab"; +import { propagation, SpanStatusCode } from "@opentelemetry/api"; +import { W3CTraceContextPropagator } from "@opentelemetry/core"; import { delay } from "es-toolkit"; import { deepStrictEqual, @@ -17,9 +24,20 @@ import { } from "../../testing/mod.ts"; import { createFederationBuilder } from "../builder.ts"; import type { Context } from "../context.ts"; -import type { Federatable } from "../federation.ts"; +import type { Federatable, FederationOptions } from "../federation.ts"; +import { + type KvKey, + type KvStore, + type KvStoreListEntry, + type KvStoreSetOptions, + MemoryKvStore, +} from "../kv.ts"; import { createFederation, type FederationImpl } from "../middleware.ts"; -import { InProcessMessageQueue } from "../mq.ts"; +import { + InProcessMessageQueue, + type MessageQueue, + type MessageQueueEnqueueOptions, +} from "../mq.ts"; import type { TaskMessage } from "../queue.ts"; import TaskCodec from "./codec.ts"; import type { TaskDefinition, TaskRegistry } from "./task.ts"; @@ -704,3 +722,970 @@ test("processQueuedTask() task dispatch", async (t) => { strictEqual(queue.enqueued.length, 0); }); }); + +/** + * A {@link KvStore} that delegates to an in-memory store but deliberately + * omits `cas`, so that `kv.cas == null`. This drives the deduplication + * fallback branches that fire when no conditional-write primitive exists. + */ +class CaslessKvStore implements KvStore { + readonly inner = new MemoryKvStore(); + get(key: KvKey): Promise { + return this.inner.get(key); + } + set(key: KvKey, value: unknown, options?: KvStoreSetOptions): Promise { + return this.inner.set(key, value, options); + } + delete(key: KvKey): Promise { + return this.inner.delete(key); + } + list(prefix?: KvKey): AsyncIterable { + return this.inner.list(prefix); + } + // No `cas`: the fallback branch is reached precisely when `kv.cas == null`. +} + +async function collectKeys(kv: KvStore, prefix: KvKey): Promise { + const keys: KvKey[] = []; + for await (const { key } of kv.list(prefix)) keys.push(key); + return keys; +} + +const TASK_DEDUP_PREFIX: KvKey = ["_fedify", "taskDeduplication"]; +const ACTIVITY_IDEMPOTENCE_PREFIX: KvKey = ["_fedify", "activityIdempotence"]; + +test("task deduplication", async (t) => { + await t.step( + "forwards the key to a nativeDeduplication queue without writing KV", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "k"); + // The backend owns the check, so Fedify must not write any KV marker. + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 0); + }, + ); + + await t.step( + "skips a second enqueue with the same key within the TTL", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("kv-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].message.taskName, "kv-dedup"); + // A non-native queue never receives a key it would ignore. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "re-enqueues with the same key after the TTL expires", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + taskDeduplicationTtl: { milliseconds: 100 }, + }); + const task = federation.defineTask("kv-dedup-ttl", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Wait comfortably past the 100 ms TTL so the marker expires. + await delay(300); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step( + 'rejects with TypeError when fallback is "closed" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }), + { name: "TypeError" }, + ); + strictEqual(queue.enqueued.length, 0); + }, + ); + + await t.step( + 'proceeds when fallback is "open" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Best-effort fallback never forwards the key to a non-native queue. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "writes only under taskDeduplication, never activityIdempotence", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("prefix-isolation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 1); + strictEqual( + (await collectKeys(kv, ACTIVITY_IDEMPOTENCE_PREFIX)).length, + 0, + ); + }, + ); + + await t.step("applies one batch-level key to enqueueTaskMany", async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + // First batch enqueues all three; the second is skipped entirely. + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].messages.length, 3); + }); +}); + +test( + "task deduplication validates every payload before reserving the key", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("dedup-validation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // A rejected payload must neither enqueue nor consume the key. + await rejects(() => + ctx.enqueueTask(task, 123 as unknown as string, { + deduplicationKey: "k", + }) + ); + strictEqual(queue.enqueued.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // The same key must remain usable by the first valid enqueue. + await ctx.enqueueTask(task, "valid", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + deepStrictEqual( + await collectKeys(kv, TASK_DEDUP_PREFIX), + [[...TASK_DEDUP_PREFIX, "k"]], + ); + + // Once the valid enqueue reserves it, the same key must deduplicate. + await ctx.enqueueTask(task, "duplicate", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "native task batch deduplication is one enqueueMany operation per call", + async () => { + class NativeBatchDeduplicatingQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly attempts: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + readonly accepted: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue(): Promise { + throw new Error("A multi-item native batch must use enqueueMany()."); + } + + enqueueMany( + messages: readonly TaskMessage[], + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key == null) { + throw new TypeError( + "Native batch enqueue requires a deduplication key.", + ); + } + this.attempts.push({ messages, options }); + if (this.#seen.has(key)) return Promise.resolve(); + this.#seen.add(key); + this.accepted.push({ messages, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const queue = new NativeBatchDeduplicatingQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a1", "a2", "a3"], { + deduplicationKey: "batch-a", + }); + await ctx.enqueueTaskMany( + task, + ["duplicate1", "duplicate2", "duplicate3"], + { + deduplicationKey: "batch-a", + }, + ); + await ctx.enqueueTaskMany(task, ["b1", "b2", "b3"], { + deduplicationKey: "batch-b", + }); + + // Every API call reaches the backend exactly once, with one key governing + // all three messages. The backend accepts complete batches or none. + strictEqual(queue.attempts.length, 3); + deepStrictEqual( + queue.attempts.map(({ messages }) => messages.length), + [3, 3, 3], + ); + deepStrictEqual( + queue.attempts.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-a", "batch-b"], + ); + strictEqual(queue.accepted.length, 2); + deepStrictEqual( + queue.accepted.map(({ messages }) => messages.length), + [3, 3], + ); + deepStrictEqual( + queue.accepted.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-b"], + ); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "native task batch deduplication rejects without enqueueMany", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-without-enqueue-many", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + + // Reject before any partial enqueue or fallback KV write. Silently + // dropping the key from items 2..n cannot satisfy these assertions. + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // A one-item batch is representable by enqueue() and must remain valid. + await ctx.enqueueTaskMany(task, ["single"], { + deduplicationKey: "single", + }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "single"); + }, +); + +test( + "deduplication - native batch capability errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-capability-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTaskMany( + task, + [1, 2, 3] as unknown as readonly string[], + { deduplicationKey: "batch" }, + ); + } catch (error) { + caught = error; + } + + // The queue capability makes this request impossible regardless of the + // payload, so no user-supplied validator may run first. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("enqueueMany")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "closed deduplication fallback errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue(); + const kv = new CaslessKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTask( + task, + 1 as unknown as string, + { deduplicationKey: "k" }, + ); + } catch (error) { + caught = error; + } + + // Closed fallback is a configuration-level rejection. It must be + // deterministic and independent of user payload validation. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("conditional write")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +/** Wires test telemetry doubles into a fresh federation for the suite below. */ +const instrument = (options: FederationOptions) => { + const [meterProvider, recorder] = createTestMeterProvider(); + const [tracerProvider, exporter] = createTestTracerProvider(); + const federation = createFederation({ + ...options, + meterProvider, + tracerProvider, + }) as FederationImpl; + return { federation, recorder, exporter }; +}; + +test("task observability", async (t) => { + await t.step( + "opens a fedify.task span carrying name and attempt on success", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("sendDigest", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("sendDigest", "payload"), + ); + + const spans = exporter.getSpans("fedify.task"); + strictEqual(spans.length, 1); + strictEqual(spans[0].attributes["fedify.task.name"], "sendDigest"); + strictEqual(spans[0].attributes["fedify.task.attempt"], 0); + // A completed task carries no failure reason on its span… + strictEqual(spans[0].attributes["fedify.task.failure_reason"], undefined); + strictEqual(spans[0].status.code, SpanStatusCode.UNSET); + + const started = recorder.getMeasurements("fedify.queue.task.started"); + strictEqual(started.length, 1); + strictEqual(started[0].attributes["fedify.queue.role"], "task"); + strictEqual(started[0].attributes["fedify.task.name"], "sendDigest"); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual(completed[0].attributes["fedify.queue.role"], "task"); + strictEqual(completed[0].attributes["fedify.task.name"], "sendDigest"); + strictEqual( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + // …nor on its outcome metric. + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + }, + ); + + await t.step( + "inherits the parent trace context from the enqueue site", + async () => { + // The worker extracts the parent through the global propagator; a real + // W3C propagator is required because the default is a no-op. + const traceId = "0af7651916cd43dd8448eb211c80319c"; + const spanId = "b7ad6b7169203331"; + propagation.setGlobalPropagator(new W3CTraceContextPropagator()); + try { + const queue = new MockQueue(); + const { federation, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("traced", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("traced", "payload", { + traceContext: { + traceparent: `00-${traceId}-${spanId}-01`, + }, + }), + ); + + const span = exporter.getSpans("fedify.task")[0]; + ok(span != null); + strictEqual(span.spanContext().traceId, traceId); + strictEqual(span.parentSpanContext?.spanId, spanId); + } finally { + propagation.disable(); + } + }, + ); + + await t.step( + "attributes a deserialization failure and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("decode-me", { + schema: stringSchema, + handler: () => { + called++; + }, + }); + const message = await makeTaskMessage("decode-me", "payload"); + await federation.processQueuedTask(undefined, { + ...message, + data: "garbage that is not devalue", + }); + + strictEqual(called, 0); + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual(failed[0].attributes["fedify.queue.role"], "task"); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "deserialization", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "deserialization", + ); + // A dropped payload is a failed outcome, so the span status is ERROR. + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes a validation failure and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("strict-shape", { + schema: numberSchema, // expects a number… + handler: () => { + called++; + }, + }); + // …but a valid devalue payload carries a string. + await federation.processQueuedTask( + undefined, + await makeTaskMessage("strict-shape", "not a number"), + ); + + strictEqual(called, 0); + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "validation", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "validation", + ); + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes an unknown task and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("never-registered", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "unknown_task", + ); + // The backend label is still populated on an unknown-task drop. + strictEqual(failed[0].attributes["fedify.queue.backend"], "MockQueue"); + // Cardinality guard: an unregistered, wire-derived task name must NOT + // become a metric attribute—it would spawn unbounded time series… + strictEqual(failed[0].attributes["fedify.task.name"], undefined); + strictEqual( + recorder.getMeasurements("fedify.queue.task.started")[0] + .attributes["fedify.task.name"], + undefined, + ); + const span = exporter.getSpans("fedify.task")[0]; + // …but the span still carries the real name for tracing the drop. + strictEqual(span.attributes["fedify.task.name"], "never-registered"); + strictEqual( + span.attributes["fedify.task.failure_reason"], + "unknown_task", + ); + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes a handler failure on a terminal give-up", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("explode", { + schema: stringSchema, + handler: () => { + throw new Error("boom"); + }, + retryPolicy: () => null, // give up immediately + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("explode", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); // gave up, no retry + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "handler", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "handler", + ); + // A terminal give-up is a failed outcome, so the span status is ERROR. + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "reports the resolved outbox queue as the backend on fallback", + async () => { + // A distinctly named queue so the backend label is unambiguous. + class FallbackOutboxQueue extends MockQueue {} + const outboxQueue = new FallbackOutboxQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { outbox: outboxQueue }, // no dedicated task queue + }); + federation.defineTask("fallback", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("fallback", "payload"), + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.queue.backend"], + "FallbackOutboxQueue", + ); + }, + ); + + await t.step( + "records an enqueue measurement with role task", + async () => { + const queue = new MockQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("enqueue-me", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload"); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "enqueue-me"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 0); + }, + ); + + await t.step( + "records the retry re-enqueue with role task and a bumped attempt", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("retry-me", { + schema: stringSchema, + handler: () => { + throw new Error("boom"); + }, + retryPolicy: () => Temporal.Duration.from({ milliseconds: 1 }), + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("retry-me", "payload"), + ); + + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].message.attempt, 1); + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "retry-me"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 1); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); + }, + ); + + await t.step( + "records an abort as aborted, without a failure reason or error status", + async () => { + const queue = new MockQueue({ nativeRetrial: true }); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + }); + const message = await makeTaskMessage("aborts", "payload"); + await rejects( + () => federation.processQueuedTask(undefined, message), + { name: "AbortError" }, + ); + + const span = exporter.getSpans("fedify.task")[0]; + ok(span != null); + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + const durations = recorder.getMeasurements("fedify.queue.task.duration"); + strictEqual(durations.length, 1); + strictEqual( + durations[0].attributes["fedify.queue.task.result"], + "aborted", + ); + }, + ); + + await t.step( + "on a non-native queue an aborted handler that gives up is aborted", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts-give-up", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + retryPolicy: () => null, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("aborts-give-up", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + strictEqual( + recorder.getMeasurements("fedify.queue.task.completed").length, + 0, + ); + const durations = recorder.getMeasurements("fedify.queue.task.duration"); + strictEqual(durations.length, 1); + strictEqual( + durations[0].attributes["fedify.queue.task.result"], + "aborted", + ); + }, + ); + + await t.step( + "on a non-native queue an aborted handler is retried, not failed", + async () => { + const queue = new MockQueue(); // nativeRetrial: false + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts-soft", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + retryPolicy: () => Temporal.Duration.from({ milliseconds: 1 }), + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("aborts-soft", "payload"), + ); + + strictEqual(queue.enqueued.length, 1); // retried, behavior unchanged + strictEqual(queue.enqueued[0].message.attempt, 1); + // No `handler` failure leaks onto the span or any failure metric… + const span = exporter.getSpans("fedify.task")[0]; + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + // …and the swallowed-into-retry attempt records `completed`, matching the + // inbox/outbox internal-retry convention. + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); + }, + ); +}); diff --git a/packages/init/src/package.test.ts b/packages/init/src/package.test.ts index 97a33247b..39cf17011 100644 --- a/packages/init/src/package.test.ts +++ b/packages/init/src/package.test.ts @@ -13,6 +13,7 @@ async function assertTargetExists(path: string): Promise { test( "package.json entrypoints match built init files", + { skip: "Deno" in globalThis }, async () => { const packageJson = JSON.parse( await readFile(resolve(packageDir, "package.json"), "utf8"),