From c1ac7e8bd6455a32a1054e5e0b048b9f79f23797 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 08:27:54 +0900 Subject: [PATCH 01/11] Expire circuit breaker state Add TTLs to circuit breaker state written by the default failure policy so unreachable hosts do not leave per-host state behind forever. Custom failure policies can opt in with stateTtl because Fedify cannot infer their retention window. Migrate valid legacy state from earlier 2.3 releases instead of deleting active circuits, and guard the best-effort migration with a CAS marker so concurrent workers do not race or block delivery behind another sweep. Fixes https://github.com/fedify-dev/fedify/issues/916 https://github.com/fedify-dev/fedify/pull/917 Assisted-by: Codex:gpt-5.5 --- CHANGES.md | 13 + .../src/federation/circuit-breaker.test.ts | 384 +++++++++++++++++- .../fedify/src/federation/circuit-breaker.ts | 276 ++++++++++++- .../fedify/src/federation/middleware.test.ts | 16 + 4 files changed, 680 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8f3619777..5c2d43b14 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,19 @@ Version 2.3.2 To be released. +### @fedify/fedify + + - Fixed the outbound delivery circuit breaker retaining per-host state in the + configured key–value store forever when a remote host never recovered. + Circuit breaker state now receives a TTL on writes made with the default + failure policy, custom failure policies can opt in with the new `stateTtl` + option, and stale state written by earlier 2.3 releases is cleared + automatically on CAS-backed stores after upgrade, with another sweep after a + grace window to cover rolling deployments. [[#916], [#917]] + +[#916]: https://github.com/fedify-dev/fedify/issues/916 +[#917]: https://github.com/fedify-dev/fedify/pull/917 + Version 2.3.1 ------------- diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index f1d8cc3cd..30de33b23 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -1,11 +1,17 @@ import { test } from "@fedify/fixture"; -import { assertEquals, assertThrows } from "@std/assert"; +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; import { CircuitBreaker, normalizeCircuitBreakerOptions, parseCircuitBreakerKvState, } from "./circuit-breaker.ts"; -import { type KvKey, type KvStoreSetOptions, MemoryKvStore } from "./kv.ts"; +import { + type KvKey, + type KvStore, + type KvStoreListEntry, + type KvStoreSetOptions, + MemoryKvStore, +} from "./kv.ts"; class AlwaysConflictingKvStore extends MemoryKvStore { attempts = 0; @@ -26,6 +32,7 @@ class AlwaysConflictingKvStore extends MemoryKvStore { class CountingCasKvStore extends MemoryKvStore { attempts = 0; + options: (KvStoreSetOptions | undefined)[] = []; override cas( key: KvKey, @@ -34,10 +41,100 @@ class CountingCasKvStore extends MemoryKvStore { options?: KvStoreSetOptions, ): Promise { this.attempts++; + this.options.push(options); return super.cas(key, expectedValue, newValue, options); } } +class CountingSetKvStore implements KvStore { + #store = new MemoryKvStore(); + keys: KvKey[] = []; + options: (KvStoreSetOptions | undefined)[] = []; + + get(key: KvKey): Promise { + return this.#store.get(key); + } + + set( + key: KvKey, + value: unknown, + options?: KvStoreSetOptions, + ): Promise { + this.keys.push(key); + this.options.push(options); + return this.#store.set(key, value, options); + } + + delete(key: KvKey): Promise { + return this.#store.delete(key); + } + + list(prefix?: KvKey): AsyncIterable { + return this.#store.list(prefix); + } +} + +class CountingSweepKvStore extends MemoryKvStore { + listCalls = 0; + deletedKeys: KvKey[] = []; + + override async *list(prefix?: KvKey) { + this.listCalls++; + yield* super.list(prefix); + } + + override delete(key: KvKey): Promise { + this.deletedKeys.push(key); + return super.delete(key); + } +} + +class UpdatingDuringListKvStore extends MemoryKvStore { + override async *list(prefix?: KvKey) { + for await (const entry of super.list(prefix)) { + if ( + entry.key.length === 3 && + entry.key[0] === "_fedify" && + entry.key[1] === "circuit" && + entry.key[2] === "racy.example" + ) { + await super.set(entry.key, { + state: "closed", + failures: ["2026-05-25T00:01:00Z"], + }); + } + yield entry; + } + } +} + +class FailingOnceSweepKvStore extends MemoryKvStore { + listCalls = 0; + deletedKeys: KvKey[] = []; + + override async *list(prefix?: KvKey) { + this.listCalls++; + if (this.listCalls === 1) { + throw new Error("transient list failure"); + } + yield* super.list(prefix); + } + + override delete(key: KvKey): Promise { + this.deletedKeys.push(key); + return super.delete(key); + } +} + +function markLegacySweepDone(kv: MemoryKvStore): Promise { + return kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { state: "final" }); +} + test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { const options = normalizeCircuitBreakerOptions({ failureThreshold: 3, @@ -167,6 +264,7 @@ test("parseCircuitBreakerKvState() validates stored shape", () => { failures: ["2026-05-25T00:00:00Z"], opened: "2026-05-25T00:00:00Z", halfOpened: "2026-05-25T00:00:00Z", + __fedifyCircuitBreakerStateVersion: 1, }), { state: "open", @@ -298,6 +396,7 @@ test("CircuitBreaker opens, probes, closes, and drops held activities", async () test("CircuitBreaker recovers stale half-open probes", async () => { const kv = new MemoryKvStore(); + await markLegacySweepDone(kv); let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); const circuit = new CircuitBreaker({ kv, @@ -343,6 +442,7 @@ test("CircuitBreaker recovers stale half-open probes", async () => { test("CircuitBreaker caps held delays at activity TTL", async () => { const kv = new MemoryKvStore(); + await markLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); const circuit = new CircuitBreaker({ kv, @@ -402,6 +502,7 @@ test("CircuitBreaker caps held delays at activity TTL", async () => { test("CircuitBreaker ignores malformed held timestamps", async () => { const kv = new MemoryKvStore(); + await markLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); const circuit = new CircuitBreaker({ kv, @@ -430,6 +531,7 @@ test("CircuitBreaker ignores malformed held timestamps", async () => { test("CircuitBreaker bounds beforeSend CAS retries", async () => { let kv = new AlwaysConflictingKvStore(); + await markLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:30:00Z"); let circuit = new CircuitBreaker({ kv, @@ -456,6 +558,7 @@ test("CircuitBreaker bounds beforeSend CAS retries", async () => { }); kv = new AlwaysConflictingKvStore(); + await markLegacySweepDone(kv); circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -484,6 +587,7 @@ test("CircuitBreaker bounds beforeSend CAS retries", async () => { test("CircuitBreaker skips recording failures for open circuits", async () => { const kv = new CountingCasKvStore(); + await markLegacySweepDone(kv); const circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -507,6 +611,282 @@ test("CircuitBreaker skips recording failures for open circuits", async () => { ); }); +test("CircuitBreaker writes stored states with a TTL", async () => { + const kv = new CountingCasKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failureThreshold: 2, + failureWindow: { hours: 3 }, + recoveryDelay: { hours: 1 }, + heldActivityTtl: { days: 2 }, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.options.at(-1), { + ttl: Temporal.Duration.from({ days: 2 }), + }); +}); + +test("CircuitBreaker writes stored states with a TTL without CAS", async () => { + const kv = new CountingSetKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failureThreshold: 2, + failureWindow: { hours: 3 }, + recoveryDelay: { hours: 1 }, + heldActivityTtl: { days: 2 }, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.keys, [["_fedify", "circuit", "remote.example"]]); + assertEquals(kv.options.at(-1), { + ttl: Temporal.Duration.from({ days: 2 }), + }); +}); + +test("CircuitBreaker preserves custom failure history by default", async () => { + const kv = new CountingCasKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failure: (timestamps) => timestamps.length >= 2, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.options.at(-1), undefined); +}); + +test("CircuitBreaker accepts an explicit TTL with custom failure policies", async () => { + const kv = new CountingCasKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failure: (timestamps) => timestamps.length >= 2, + stateTtl: { days: 14 }, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.options.at(-1), { + ttl: Temporal.Duration.from({ days: 14 }), + }); +}); + +test("CircuitBreaker migrates legacy states without TTL once", async () => { + const kv = new CountingSweepKvStore(); + await kv.set(["_fedify", "circuit", "stale-a.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + await kv.set(["_fedify", "circuit", "stale-b.example"], { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }); + await kv.set(["_fedify", "circuit", "malformed.example"], "stale"); + await kv.set(["_fedify", "other", "untouched.example"], "keep"); + let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { failureThreshold: 2 }, + }); + + assertEquals(await circuit.beforeSend("stale-b.example", {}), { + type: "hold", + state: "open", + delay: Temporal.Duration.from({ minutes: 30 }), + heldSince: Temporal.Instant.from("2026-05-25T00:00:00Z"), + }); + + assertEquals( + await kv.get(["_fedify", "circuit", "stale-a.example"]), + { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + __fedifyCircuitBreakerStateVersion: 1, + }, + ); + assertEquals( + await kv.get(["_fedify", "circuit", "stale-b.example"]), + { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + __fedifyCircuitBreakerStateVersion: 1, + }, + ); + assertEquals( + await kv.get(["_fedify", "circuit", "malformed.example"]), + undefined, + ); + assertEquals( + await kv.get(["_fedify", "other", "untouched.example"]), + "keep", + ); + await circuit.recordFailure("remote.example"); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + assertEquals(kv.listCalls, 1); + assertEquals(kv.deletedKeys, [ + ["_fedify", "circuit", "malformed.example"], + ]); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "done", retryUntil: "2026-06-01T00:00:00Z" }, + ); + + await kv.set(["_fedify", "circuit", "late-stale.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + await circuit.recordFailure("another.example"); + + assertEquals(await kv.get(["_fedify", "circuit", "late-stale.example"]), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + assertEquals(kv.listCalls, 1); + + now = Temporal.Instant.from("2026-06-02T00:00:00Z"); + await circuit.recordFailure("final.example"); + + assertEquals( + await kv.get(["_fedify", "circuit", "late-stale.example"]), + { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + __fedifyCircuitBreakerStateVersion: 1, + }, + ); + assertEquals(await circuit.getState("another.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + assertEquals(kv.listCalls, 2); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "final" }, + ); +}); + +test("CircuitBreaker does not sweep state changed after listing", async () => { + const kv = new UpdatingDuringListKvStore(); + await kv.set(["_fedify", "circuit", "racy.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:02:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(await kv.get(["_fedify", "circuit", "racy.example"]), { + state: "closed", + failures: ["2026-05-25T00:01:00Z"], + }); +}); + +test("CircuitBreaker skips legacy sweep already running elsewhere", async () => { + const kv = new CountingSweepKvStore(); + await kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "sweeping", + started: "2026-05-25T00:00:00Z", + retryUntil: "2026-06-01T00:00:00Z", + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.listCalls, 0); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); +}); + +test("CircuitBreaker retries legacy sweep after transient failures", async () => { + const kv = new FailingOnceSweepKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await assertRejects( + () => circuit.recordFailure("remote.example"), + Error, + "transient list failure", + ); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + undefined, + ); + assertEquals(kv.deletedKeys, [[ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]]); + await circuit.recordFailure("remote.example"); + + assertEquals(kv.listCalls, 2); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); +}); + test("CircuitBreaker prunes stale closed failure history", async () => { const kv = new MemoryKvStore(); let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index 4fd13959a..324cae1dd 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -19,6 +19,10 @@ export interface CircuitBreakerKvState { readonly halfOpened?: string; } +type StoredCircuitBreakerKvState = CircuitBreakerKvState & { + readonly __fedifyCircuitBreakerStateVersion?: 1; +}; + /** * Details passed to {@link CircuitBreakerOptions.onActivityDrop} when a held * activity expires before the remote host recovers. @@ -81,6 +85,17 @@ export type CircuitBreakerOptions = CircuitBreakerFailurePolicy & { */ readonly releaseInterval?: Temporal.Duration | Temporal.DurationLike; + /** + * How long Fedify keeps circuit breaker state in the configured key-value + * store. + * + * When omitted, Fedify derives this from `failureWindow`, `recoveryDelay`, and + * `heldActivityTtl` for the default numeric failure policy. Custom `failure` + * callbacks do not have an inspectable time window, so custom policies do not + * expire their stored state unless this option is provided. + */ + readonly stateTtl?: Temporal.Duration | Temporal.DurationLike; + /** * Called whenever the circuit state changes. */ @@ -112,11 +127,38 @@ export interface NormalizedCircuitBreakerOptions { readonly recoveryDelay: Temporal.Duration; readonly heldActivityTtl: Temporal.Duration; readonly releaseInterval: Temporal.Duration; + readonly stateTtl: Temporal.Duration | undefined; readonly onStateChange?: CircuitBreakerOptions["onStateChange"]; readonly onActivityDrop?: CircuitBreakerOptions["onActivityDrop"]; } const MAX_CUSTOM_FAILURE_HISTORY = 100; +// Fedify 2.3.0 and 2.3.1 wrote circuit breaker state without a TTL, so those +// soft-state entries could live forever for hosts that never recovered. Keep a +// marker under the circuit prefix to clear old state on CAS-backed stores after +// upgrade, then retry once after a grace window in case old workers wrote more +// no-TTL state during a rolling deployment. See: +// https://github.com/fedify-dev/fedify/issues/916 +const LEGACY_SWEEP_MARKER = [ + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", +] as const; +const LEGACY_SWEEP_DELETING_MARKER = { + __fedifyDeletingCircuitBreakerLegacyState: true, +}; +const CIRCUIT_BREAKER_STATE_VERSION = 1; +const LEGACY_SWEEP_LOCK_TTL = Temporal.Duration.from({ minutes: 5 }); +const LEGACY_SWEEP_RETRY_WINDOW = Temporal.Duration.from({ hours: 24 * 7 }); +const LEGACY_SWEEP_WAIT_INTERVAL = 10; + +type LegacySweepMarker = + | { + readonly state: "sweeping"; + readonly started: string; + readonly retryUntil: string; + } + | { readonly state: "done"; readonly retryUntil: string } + | { readonly state: "final" }; /** * Constructor options for {@link CircuitBreaker}. @@ -174,6 +216,7 @@ export class CircuitBreaker { readonly #prefix: KvKey; readonly #options: NormalizedCircuitBreakerOptions; readonly #now: () => Temporal.Instant; + #legacySweep: Promise | undefined; readonly #stateChangeObserver: | CircuitBreakerCreateOptions["stateChangeObserver"] | undefined; @@ -204,6 +247,7 @@ export class CircuitBreaker { remoteHost: string, message: { readonly circuitHeldSince?: string }, ): Promise { + await this.#sweepLegacyStates(); const heldSince = parseHeldSince(message.circuitHeldSince); const now = this.#now(); if ( @@ -307,6 +351,7 @@ export class CircuitBreaker { async recordSuccess( remoteHost: string, ): Promise { + await this.#sweepLegacyStates(); for (let attempt = 0; attempt < 10; attempt++) { const oldState = await this.#get(remoteHost); if (oldState == null) return undefined; @@ -333,6 +378,7 @@ export class CircuitBreaker { async recordFailure( remoteHost: string, ): Promise { + await this.#sweepLegacyStates(); const now = this.#now(); for (let attempt = 0; attempt < 10; attempt++) { const oldState = await this.#get(remoteHost); @@ -395,13 +441,111 @@ export class CircuitBreaker { async getState( remoteHost: string, ): Promise { - return await this.#get(remoteHost); + await this.#sweepLegacyStates(); + return stripStoredCircuitBreakerState(await this.#get(remoteHost)); } #key(remoteHost: string): KvKey { return [...this.#prefix, remoteHost] as KvKey; } + #legacySweepMarkerKey(): KvKey { + return [...this.#prefix, ...LEGACY_SWEEP_MARKER] as KvKey; + } + + async #sweepLegacyStates(): Promise { + if (this.#kv.cas == null) return; + this.#legacySweep ??= this.#sweepLegacyStatesImpl().finally(() => { + this.#legacySweep = undefined; + }); + await this.#legacySweep; + } + + async #sweepLegacyStatesImpl(): Promise { + const markerKey = this.#legacySweepMarkerKey(); + const marker = await this.#acquireLegacySweep(markerKey); + if (marker === "done") return; + try { + for await (const { key, value } of this.#kv.list(this.#prefix)) { + if (isEqualKvKey(key, markerKey)) continue; + await this.#migrateLegacyState(key, value); + } + } catch (error) { + await this.#deleteIfUnchanged(markerKey, marker); + throw error; + } + await this.#kv.set( + markerKey, + this.#finishLegacySweepMarker(marker), + ); + } + + #finishLegacySweepMarker(marker: LegacySweepMarker): LegacySweepMarker { + if ( + "retryUntil" in marker && + Temporal.Instant.compare( + this.#now(), + Temporal.Instant.from(marker.retryUntil), + ) < + 0 + ) { + return { state: "done", retryUntil: marker.retryUntil }; + } + return { state: "final" }; + } + + async #migrateLegacyState(key: KvKey, value: unknown): Promise { + if (isCurrentCircuitBreakerState(value)) return; + const state = parseCircuitBreakerKvState(value); + if (state != null) { + await this.#kv.cas?.( + key, + value, + markCircuitBreakerState(state), + this.#setOptions(), + ); + return; + } + await this.#deleteIfUnchanged(key, value); + } + + async #deleteIfUnchanged(key: KvKey, value: unknown): Promise { + if (await this.#kv.cas?.(key, value, LEGACY_SWEEP_DELETING_MARKER)) { + await this.#kv.delete(key); + } + } + + async #acquireLegacySweep( + markerKey: KvKey, + ): Promise { + while (true) { + const marker = await this.#kv.get(markerKey); + if (isLegacySweepDone(marker, this.#now())) return "done"; + if (isLegacySweepInProgress(marker)) { + return "done"; + } + if (marker != null && !isLegacySweepRetryDue(marker, this.#now())) { + return "done"; + } + const retryUntil = isLegacySweepRetryDue(marker, this.#now()) + ? marker.retryUntil + : this.#now().add(LEGACY_SWEEP_RETRY_WINDOW).toString(); + const sweeping = { + state: "sweeping", + started: this.#now().toString(), + retryUntil, + } satisfies LegacySweepMarker; + if ( + await this.#kv.cas?.(markerKey, marker ?? undefined, sweeping, { + ttl: LEGACY_SWEEP_LOCK_TTL, + }) + ) { + return sweeping; + } + await delay(LEGACY_SWEEP_WAIT_INTERVAL); + } + } + #capHeldRetryAt( now: Temporal.Instant, heldSince: Temporal.Instant | undefined, @@ -414,27 +558,43 @@ export class CircuitBreaker { : retryAt; } - async #get(remoteHost: string): Promise { - return parseCircuitBreakerKvState( + async #get( + remoteHost: string, + ): Promise { + return parseStoredCircuitBreakerKvState( await this.#kv.get(this.#key(remoteHost)), ); } async #replace( remoteHost: string, - oldState: CircuitBreakerKvState | undefined, + oldState: StoredCircuitBreakerKvState | undefined, newState: CircuitBreakerKvState | undefined, ): Promise { const key = this.#key(remoteHost); + const storedState = newState == null ? undefined : markCircuitBreakerState( + newState, + ); if (this.#kv.cas == null) { - if (newState == null) { + if (storedState == null) { await this.#kv.delete(key); } else { - await this.#kv.set(key, newState); + await this.#kv.set(key, storedState, this.#setOptions()); } return true; } - return await this.#kv.cas(key, oldState, newState); + return await this.#kv.cas( + key, + oldState, + storedState, + storedState == null ? undefined : this.#setOptions(), + ); + } + + #setOptions(): { ttl: Temporal.Duration } | undefined { + return this.#options.stateTtl == null + ? undefined + : { ttl: this.#options.stateTtl }; } async #notifyStateChange( @@ -485,14 +645,21 @@ export function normalizeCircuitBreakerOptions( const releaseInterval = toInstantDuration( options.releaseInterval ?? { seconds: 1 }, ); + const configuredStateTtl = options.stateTtl == null + ? undefined + : toInstantDuration(options.stateTtl); assertPositiveDuration(recoveryDelay, "recoveryDelay"); assertPositiveDuration(heldActivityTtl, "heldActivityTtl"); assertPositiveDuration(releaseInterval, "releaseInterval"); + if (configuredStateTtl != null) { + assertPositiveDuration(configuredStateTtl, "stateTtl"); + } let failure: (timestamps: readonly Temporal.Instant[]) => boolean; let pruneFailures: ( timestamps: readonly Temporal.Instant[], now: Temporal.Instant, ) => readonly Temporal.Instant[]; + let stateTtl: Temporal.Duration | undefined; if (options.failure == null) { const failureThreshold = options.failureThreshold ?? 5; if (!Number.isInteger(failureThreshold) || failureThreshold <= 0) { @@ -516,10 +683,13 @@ export function normalizeCircuitBreakerOptions( const last = timestamps[timestamps.length - 1]; return Temporal.Duration.compare(first.until(last), failureWindow) <= 0; }; + stateTtl = configuredStateTtl ?? + maxDuration(recoveryDelay, heldActivityTtl, failureWindow); } else { failure = options.failure; pruneFailures = (timestamps) => timestamps.slice(-MAX_CUSTOM_FAILURE_HISTORY); + stateTtl = configuredStateTtl; } return { failure, @@ -527,11 +697,91 @@ export function normalizeCircuitBreakerOptions( recoveryDelay, heldActivityTtl, releaseInterval, + stateTtl, onStateChange: options.onStateChange, onActivityDrop: options.onActivityDrop, }; } +function maxDuration( + duration: Temporal.Duration, + ...durations: Temporal.Duration[] +): Temporal.Duration { + return durations.reduce( + (max, candidate) => + Temporal.Duration.compare(candidate, max) > 0 ? candidate : max, + duration, + ); +} + +function isEqualKvKey(left: KvKey, right: KvKey): boolean { + return left.length === right.length && + left.every((part, index) => part === right[index]); +} + +function isLegacySweepDone( + value: unknown, + now: Temporal.Instant, +): value is LegacySweepMarker { + if (typeof value !== "object" || value == null || !("state" in value)) { + return false; + } + if (value.state === "final") return true; + if ( + value.state === "done" && "retryUntil" in value && + typeof value.retryUntil === "string" + ) { + return Temporal.Instant.compare( + now, + Temporal.Instant.from(value.retryUntil), + ) < + 0; + } + return false; +} + +function isLegacySweepRetryDue( + value: unknown, + now: Temporal.Instant, +): value is Extract { + return typeof value === "object" && value != null && + "state" in value && value.state === "done" && + "retryUntil" in value && typeof value.retryUntil === "string" && + Temporal.Instant.compare(now, Temporal.Instant.from(value.retryUntil)) >= 0; +} + +function isLegacySweepInProgress(value: unknown): boolean { + return typeof value === "object" && value != null && + "state" in value && value.state === "sweeping"; +} + +function isCurrentCircuitBreakerState(value: unknown): boolean { + return typeof value === "object" && value != null && + "__fedifyCircuitBreakerStateVersion" in value && + value.__fedifyCircuitBreakerStateVersion === CIRCUIT_BREAKER_STATE_VERSION; +} + +function markCircuitBreakerState( + state: CircuitBreakerKvState, +): StoredCircuitBreakerKvState { + return { + ...state, + __fedifyCircuitBreakerStateVersion: CIRCUIT_BREAKER_STATE_VERSION, + }; +} + +function stripStoredCircuitBreakerState( + state: StoredCircuitBreakerKvState | undefined, +): CircuitBreakerKvState | undefined { + if (state == null) return undefined; + const { __fedifyCircuitBreakerStateVersion: _, ...publicState } = state; + return publicState; +} + +function delay(milliseconds: number): Promise { + return new Promise((resolve) => setTimeout(resolve, milliseconds)); +} + function toInstantDuration( duration: Temporal.Duration | Temporal.DurationLike, ): Temporal.Duration { @@ -580,6 +830,14 @@ function parseHeldSince( export function parseCircuitBreakerKvState( value: unknown, ): CircuitBreakerKvState | undefined { + return stripStoredCircuitBreakerState( + parseStoredCircuitBreakerKvState(value), + ); +} + +function parseStoredCircuitBreakerKvState( + value: unknown, +): StoredCircuitBreakerKvState | undefined { const isInstantString = (v: unknown): v is string => { if (typeof v !== "string") return false; try { @@ -615,5 +873,9 @@ export function parseCircuitBreakerKvState( failures: record.failures, ...(record.opened == null ? {} : { opened: record.opened }), ...(record.halfOpened == null ? {} : { halfOpened: record.halfOpened }), + ...(record.__fedifyCircuitBreakerStateVersion === + CIRCUIT_BREAKER_STATE_VERSION + ? { __fedifyCircuitBreakerStateVersion: CIRCUIT_BREAKER_STATE_VERSION } + : {}), }; } diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index ad073b5df..c9ab4d85d 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -80,6 +80,15 @@ type Assert = T; let logtapeLock: Promise = Promise.resolve(); +function markCircuitBreakerLegacySweepDone(kv: MemoryKvStore): Promise { + return kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { state: "final" }); +} + async function withLogtapeLock(fn: () => Promise): Promise { const run = logtapeLock.then(fn, fn); logtapeLock = run.then(() => undefined, () => undefined); @@ -7375,6 +7384,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { const { federation, queued, kv } = setup({ failureThreshold: 1, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "success-bookkeeping.example"], { state: "closed", failures: [], @@ -7601,6 +7611,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { }, { permanentFailureStatusCodes: [500] }, ); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "permanent-bookkeeping.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -7938,6 +7949,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { }, { permanentFailureStatusCodes: [500] }, ); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "permanent-probe.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -7973,6 +7985,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { failureThreshold: 1, releaseInterval: { seconds: 1 }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "gone.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -8108,6 +8121,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { releaseInterval: { minutes: 1 }, }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "half-open-telemetry.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -8166,6 +8180,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { recoveryDelay: { seconds: 1 }, }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "stale-probe-telemetry.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -8265,6 +8280,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { }, }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "expired-probe.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], From 7f0db201af5f263d8326fc7f16b9105e294e6818 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 08:51:12 +0900 Subject: [PATCH 02/11] Run circuit sweeps in the background Keep the legacy circuit-breaker migration out of the delivery path. Sweep failures are logged and retried later instead of rejecting the operation that happened to start the cleanup. Add test hooks and assertions for the background sweep, including the migration TTL and the temporary deleting marker TTL. Share the test helper that marks the legacy sweep complete so seeded circuit states do not depend on duplicated marker literals. https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003525 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003530 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003532 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003535 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003537 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003540 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003541 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003542 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003544 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524003545 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524005204 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524010573 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524010575 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524010576 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524010579 Assisted-by: Codex:gpt-5.5 --- .../federation/circuit-breaker-test-utils.ts | 12 ++++ .../src/federation/circuit-breaker.test.ts | 69 ++++++++++++------- .../fedify/src/federation/circuit-breaker.ts | 39 ++++++++--- .../fedify/src/federation/middleware.test.ts | 10 +-- 4 files changed, 88 insertions(+), 42 deletions(-) create mode 100644 packages/fedify/src/federation/circuit-breaker-test-utils.ts diff --git a/packages/fedify/src/federation/circuit-breaker-test-utils.ts b/packages/fedify/src/federation/circuit-breaker-test-utils.ts new file mode 100644 index 000000000..13fcb0e11 --- /dev/null +++ b/packages/fedify/src/federation/circuit-breaker-test-utils.ts @@ -0,0 +1,12 @@ +import type { KvStore } from "./kv.ts"; + +export function markCircuitBreakerLegacySweepDone( + kv: Pick, +): Promise { + return kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { state: "final" }); +} diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index 30de33b23..63516fb65 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -1,5 +1,5 @@ import { test } from "@fedify/fixture"; -import { assertEquals, assertRejects, assertThrows } from "@std/assert"; +import { assertEquals, assertThrows } from "@std/assert"; import { CircuitBreaker, normalizeCircuitBreakerOptions, @@ -12,6 +12,7 @@ import { type KvStoreSetOptions, MemoryKvStore, } from "./kv.ts"; +import { markCircuitBreakerLegacySweepDone } from "./circuit-breaker-test-utils.ts"; class AlwaysConflictingKvStore extends MemoryKvStore { attempts = 0; @@ -76,8 +77,22 @@ class CountingSetKvStore implements KvStore { class CountingSweepKvStore extends MemoryKvStore { listCalls = 0; + casCalls: { + key: KvKey; + options: KvStoreSetOptions | undefined; + }[] = []; deletedKeys: KvKey[] = []; + override cas( + key: KvKey, + expectedValue: unknown, + newValue: unknown, + options?: KvStoreSetOptions, + ): Promise { + this.casCalls.push({ key, options }); + return super.cas(key, expectedValue, newValue, options); + } + override async *list(prefix?: KvKey) { this.listCalls++; yield* super.list(prefix); @@ -126,15 +141,6 @@ class FailingOnceSweepKvStore extends MemoryKvStore { } } -function markLegacySweepDone(kv: MemoryKvStore): Promise { - return kv.set([ - "_fedify", - "circuit", - "__fedify_meta", - "circuit_breaker_state_ttl_sweep_v1", - ], { state: "final" }); -} - test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { const options = normalizeCircuitBreakerOptions({ failureThreshold: 3, @@ -396,7 +402,7 @@ test("CircuitBreaker opens, probes, closes, and drops held activities", async () test("CircuitBreaker recovers stale half-open probes", async () => { const kv = new MemoryKvStore(); - await markLegacySweepDone(kv); + await markCircuitBreakerLegacySweepDone(kv); let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); const circuit = new CircuitBreaker({ kv, @@ -442,7 +448,7 @@ test("CircuitBreaker recovers stale half-open probes", async () => { test("CircuitBreaker caps held delays at activity TTL", async () => { const kv = new MemoryKvStore(); - await markLegacySweepDone(kv); + await markCircuitBreakerLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); const circuit = new CircuitBreaker({ kv, @@ -502,7 +508,7 @@ test("CircuitBreaker caps held delays at activity TTL", async () => { test("CircuitBreaker ignores malformed held timestamps", async () => { const kv = new MemoryKvStore(); - await markLegacySweepDone(kv); + await markCircuitBreakerLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); const circuit = new CircuitBreaker({ kv, @@ -531,7 +537,7 @@ test("CircuitBreaker ignores malformed held timestamps", async () => { test("CircuitBreaker bounds beforeSend CAS retries", async () => { let kv = new AlwaysConflictingKvStore(); - await markLegacySweepDone(kv); + await markCircuitBreakerLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:30:00Z"); let circuit = new CircuitBreaker({ kv, @@ -558,7 +564,7 @@ test("CircuitBreaker bounds beforeSend CAS retries", async () => { }); kv = new AlwaysConflictingKvStore(); - await markLegacySweepDone(kv); + await markCircuitBreakerLegacySweepDone(kv); circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -587,7 +593,7 @@ test("CircuitBreaker bounds beforeSend CAS retries", async () => { test("CircuitBreaker skips recording failures for open circuits", async () => { const kv = new CountingCasKvStore(); - await markLegacySweepDone(kv); + await markCircuitBreakerLegacySweepDone(kv); const circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -613,6 +619,7 @@ test("CircuitBreaker skips recording failures for open circuits", async () => { test("CircuitBreaker writes stored states with a TTL", async () => { const kv = new CountingCasKvStore(); + await markCircuitBreakerLegacySweepDone(kv); const circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -656,6 +663,7 @@ test("CircuitBreaker writes stored states with a TTL without CAS", async () => { test("CircuitBreaker preserves custom failure history by default", async () => { const kv = new CountingCasKvStore(); + await markCircuitBreakerLegacySweepDone(kv); const circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -672,6 +680,7 @@ test("CircuitBreaker preserves custom failure history by default", async () => { test("CircuitBreaker accepts an explicit TTL with custom failure policies", async () => { const kv = new CountingCasKvStore(); + await markCircuitBreakerLegacySweepDone(kv); const circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -716,6 +725,7 @@ test("CircuitBreaker migrates legacy states without TTL once", async () => { delay: Temporal.Duration.from({ minutes: 30 }), heldSince: Temporal.Instant.from("2026-05-25T00:00:00Z"), }); + await circuit.pendingSweep; assertEquals( await kv.get(["_fedify", "circuit", "stale-a.example"]), @@ -748,6 +758,15 @@ test("CircuitBreaker migrates legacy states without TTL once", async () => { failures: ["2026-05-25T00:00:00Z"], }); assertEquals(kv.listCalls, 1); + assertEquals( + kv.casCalls.find(({ key }) => + key.length === 3 && + key[0] === "_fedify" && + key[1] === "circuit" && + key[2] === "stale-a.example" + )?.options, + { ttl: Temporal.Duration.from({ days: 7 }) }, + ); assertEquals(kv.deletedKeys, [ ["_fedify", "circuit", "malformed.example"], ]); @@ -775,6 +794,7 @@ test("CircuitBreaker migrates legacy states without TTL once", async () => { now = Temporal.Instant.from("2026-06-02T00:00:00Z"); await circuit.recordFailure("final.example"); + await circuit.pendingSweep; assertEquals( await kv.get(["_fedify", "circuit", "late-stale.example"]), @@ -814,6 +834,7 @@ test("CircuitBreaker does not sweep state changed after listing", async () => { }); await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; assertEquals(await kv.get(["_fedify", "circuit", "racy.example"]), { state: "closed", @@ -858,11 +879,8 @@ test("CircuitBreaker retries legacy sweep after transient failures", async () => options: { failureThreshold: 2 }, }); - await assertRejects( - () => circuit.recordFailure("remote.example"), - Error, - "transient list failure", - ); + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; assertEquals( await kv.get([ "_fedify", @@ -878,10 +896,15 @@ test("CircuitBreaker retries legacy sweep after transient failures", async () => "__fedify_meta", "circuit_breaker_state_ttl_sweep_v1", ]]); - await circuit.recordFailure("remote.example"); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + await circuit.recordFailure("another.example"); + await circuit.pendingSweep; assertEquals(kv.listCalls, 2); - assertEquals(await circuit.getState("remote.example"), { + assertEquals(await circuit.getState("another.example"), { state: "closed", failures: ["2026-05-25T00:00:00Z"], }); diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index 324cae1dd..1882cc76a 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -247,7 +247,7 @@ export class CircuitBreaker { remoteHost: string, message: { readonly circuitHeldSince?: string }, ): Promise { - await this.#sweepLegacyStates(); + this.#sweepLegacyStates(); const heldSince = parseHeldSince(message.circuitHeldSince); const now = this.#now(); if ( @@ -351,7 +351,7 @@ export class CircuitBreaker { async recordSuccess( remoteHost: string, ): Promise { - await this.#sweepLegacyStates(); + this.#sweepLegacyStates(); for (let attempt = 0; attempt < 10; attempt++) { const oldState = await this.#get(remoteHost); if (oldState == null) return undefined; @@ -378,7 +378,7 @@ export class CircuitBreaker { async recordFailure( remoteHost: string, ): Promise { - await this.#sweepLegacyStates(); + this.#sweepLegacyStates(); const now = this.#now(); for (let attempt = 0; attempt < 10; attempt++) { const oldState = await this.#get(remoteHost); @@ -441,10 +441,18 @@ export class CircuitBreaker { async getState( remoteHost: string, ): Promise { - await this.#sweepLegacyStates(); + this.#sweepLegacyStates(); return stripStoredCircuitBreakerState(await this.#get(remoteHost)); } + /** + * The currently running background legacy sweep, if any. + * @internal + */ + get pendingSweep(): Promise | undefined { + return this.#legacySweep; + } + #key(remoteHost: string): KvKey { return [...this.#prefix, remoteHost] as KvKey; } @@ -453,12 +461,19 @@ export class CircuitBreaker { return [...this.#prefix, ...LEGACY_SWEEP_MARKER] as KvKey; } - async #sweepLegacyStates(): Promise { + #sweepLegacyStates(): void { if (this.#kv.cas == null) return; - this.#legacySweep ??= this.#sweepLegacyStatesImpl().finally(() => { - this.#legacySweep = undefined; - }); - await this.#legacySweep; + if (this.#legacySweep != null) return; + this.#legacySweep = this.#sweepLegacyStatesImpl() + .catch((error) => { + getLogger(["fedify", "federation", "circuit"]).warn( + "Failed to sweep legacy circuit breaker state:\n{error}", + { error }, + ); + }) + .finally(() => { + this.#legacySweep = undefined; + }); } async #sweepLegacyStatesImpl(): Promise { @@ -510,7 +525,11 @@ export class CircuitBreaker { } async #deleteIfUnchanged(key: KvKey, value: unknown): Promise { - if (await this.#kv.cas?.(key, value, LEGACY_SWEEP_DELETING_MARKER)) { + if ( + await this.#kv.cas?.(key, value, LEGACY_SWEEP_DELETING_MARKER, { + ttl: LEGACY_SWEEP_LOCK_TTL, + }) + ) { await this.#kv.delete(key); } } diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index c9ab4d85d..87326c1de 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -73,6 +73,7 @@ import { import { recordInboxActivity } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage, Message, OutboxMessage } from "./queue.ts"; +import { markCircuitBreakerLegacySweepDone } from "./circuit-breaker-test-utils.ts"; type IsEqual = (() => T extends A ? 1 : 2) extends (() => T extends B ? 1 : 2) ? true : false; @@ -80,15 +81,6 @@ type Assert = T; let logtapeLock: Promise = Promise.resolve(); -function markCircuitBreakerLegacySweepDone(kv: MemoryKvStore): Promise { - return kv.set([ - "_fedify", - "circuit", - "__fedify_meta", - "circuit_breaker_state_ttl_sweep_v1", - ], { state: "final" }); -} - async function withLogtapeLock(fn: () => Promise): Promise { const run = logtapeLock.then(fn, fn); logtapeLock = run.then(() => undefined, () => undefined); From 2c1efef678739a78b81a57b59975871cc51507a9 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 10:33:53 +0900 Subject: [PATCH 03/11] Harden circuit sweep markers Keep default circuit breaker state past the recovery delay by adding the failure window to the derived TTL. That preserves an open circuit long enough for the first half-open probe and any immediate failure to reuse the existing state. Treat malformed legacy sweep retry markers as invalid marker state instead of letting Temporal parsing throw from the background sweep. https://github.com/fedify-dev/fedify/pull/917#discussion_r3524022545 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524022552 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524025545 Assisted-by: Codex:gpt-5.5 --- .../src/federation/circuit-breaker.test.ts | 34 ++++++++++++++++++ .../fedify/src/federation/circuit-breaker.ts | 35 +++++++++++++------ 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index 63516fb65..06102d807 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -145,6 +145,8 @@ test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { const options = normalizeCircuitBreakerOptions({ failureThreshold: 3, failureWindow: { minutes: 10 }, + heldActivityTtl: { hours: 1 }, + recoveryDelay: { hours: 2 }, }); const failures = [ Temporal.Instant.from("2026-05-25T00:00:00Z"), @@ -178,6 +180,10 @@ test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { "2026-05-25T00:12:00Z", ], ); + assertEquals( + options.stateTtl, + Temporal.Duration.from({ hours: 2, minutes: 10 }), + ); }); test("normalizeCircuitBreakerOptions() validates numeric failure policy", () => { @@ -870,6 +876,34 @@ test("CircuitBreaker skips legacy sweep already running elsewhere", async () => }); }); +test("CircuitBreaker ignores malformed legacy sweep retry markers", async () => { + const kv = new CountingSweepKvStore(); + await kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "done", + retryUntil: "not an instant", + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + + assertEquals(kv.listCalls, 0); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); +}); + test("CircuitBreaker retries legacy sweep after transient failures", async () => { const kv = new FailingOnceSweepKvStore(); const circuit = new CircuitBreaker({ diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index 1882cc76a..4d7a5ffba 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -703,7 +703,7 @@ export function normalizeCircuitBreakerOptions( return Temporal.Duration.compare(first.until(last), failureWindow) <= 0; }; stateTtl = configuredStateTtl ?? - maxDuration(recoveryDelay, heldActivityTtl, failureWindow); + maxDuration(recoveryDelay.add(failureWindow), heldActivityTtl); } else { failure = options.failure; pruneFailures = (timestamps) => @@ -750,11 +750,15 @@ function isLegacySweepDone( value.state === "done" && "retryUntil" in value && typeof value.retryUntil === "string" ) { - return Temporal.Instant.compare( - now, - Temporal.Instant.from(value.retryUntil), - ) < - 0; + try { + return Temporal.Instant.compare( + now, + Temporal.Instant.from(value.retryUntil), + ) < + 0; + } catch { + return false; + } } return false; } @@ -763,10 +767,21 @@ function isLegacySweepRetryDue( value: unknown, now: Temporal.Instant, ): value is Extract { - return typeof value === "object" && value != null && - "state" in value && value.state === "done" && - "retryUntil" in value && typeof value.retryUntil === "string" && - Temporal.Instant.compare(now, Temporal.Instant.from(value.retryUntil)) >= 0; + if ( + typeof value !== "object" || value == null || + !("state" in value) || value.state !== "done" || + !("retryUntil" in value) || typeof value.retryUntil !== "string" + ) { + return false; + } + try { + return Temporal.Instant.compare( + now, + Temporal.Instant.from(value.retryUntil), + ) >= 0; + } catch { + return false; + } } function isLegacySweepInProgress(value: unknown): boolean { From 38c3e7fffb2bb99ad38cd5c45e1ab3c91cb74f2a Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 10:44:06 +0900 Subject: [PATCH 04/11] Cache completed circuit sweeps Remember completed legacy sweep markers in each CircuitBreaker instance so normal delivery paths stop polling the marker after the migration is done. The done marker is cached until retryUntil, while final markers are cached permanently. https://github.com/fedify-dev/fedify/pull/917#discussion_r3524132881 Assisted-by: Codex:gpt-5.5 --- .../src/federation/circuit-breaker.test.ts | 63 +++++++++++++++++++ .../fedify/src/federation/circuit-breaker.ts | 53 +++++++++++----- 2 files changed, 102 insertions(+), 14 deletions(-) diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index 06102d807..f9213585e 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -77,12 +77,26 @@ class CountingSetKvStore implements KvStore { class CountingSweepKvStore extends MemoryKvStore { listCalls = 0; + markerGetCalls = 0; casCalls: { key: KvKey; options: KvStoreSetOptions | undefined; }[] = []; deletedKeys: KvKey[] = []; + override get(key: KvKey): Promise { + if ( + key.length === 4 && + key[0] === "_fedify" && + key[1] === "circuit" && + key[2] === "__fedify_meta" && + key[3] === "circuit_breaker_state_ttl_sweep_v1" + ) { + this.markerGetCalls++; + } + return super.get(key); + } + override cas( key: KvKey, expectedValue: unknown, @@ -876,6 +890,55 @@ test("CircuitBreaker skips legacy sweep already running elsewhere", async () => }); }); +test("CircuitBreaker caches completed legacy sweep markers", async () => { + const doneKv = new CountingSweepKvStore(); + await doneKv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "done", + retryUntil: "2026-06-01T00:00:00Z", + }); + let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const doneCircuit = new CircuitBreaker({ + kv: doneKv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { failureThreshold: 2 }, + }); + + await doneCircuit.recordFailure("remote.example"); + await doneCircuit.pendingSweep; + await doneCircuit.recordFailure("another.example"); + await doneCircuit.pendingSweep; + + assertEquals(doneKv.markerGetCalls, 1); + + now = Temporal.Instant.from("2026-06-02T00:00:00Z"); + await doneCircuit.recordFailure("final.example"); + await doneCircuit.pendingSweep; + + assertEquals(doneKv.markerGetCalls, 2); + + const finalKv = new CountingSweepKvStore(); + await markCircuitBreakerLegacySweepDone(finalKv); + const finalCircuit = new CircuitBreaker({ + kv: finalKv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await finalCircuit.recordFailure("remote.example"); + await finalCircuit.pendingSweep; + await finalCircuit.recordFailure("another.example"); + await finalCircuit.pendingSweep; + + assertEquals(finalKv.markerGetCalls, 1); +}); + test("CircuitBreaker ignores malformed legacy sweep retry markers", async () => { const kv = new CountingSweepKvStore(); await kv.set([ diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index 4d7a5ffba..a0a391f3b 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -217,6 +217,7 @@ export class CircuitBreaker { readonly #options: NormalizedCircuitBreakerOptions; readonly #now: () => Temporal.Instant; #legacySweep: Promise | undefined; + #legacySweepDone: Temporal.Instant | "final" | undefined; readonly #stateChangeObserver: | CircuitBreakerCreateOptions["stateChangeObserver"] | undefined; @@ -464,6 +465,7 @@ export class CircuitBreaker { #sweepLegacyStates(): void { if (this.#kv.cas == null) return; if (this.#legacySweep != null) return; + if (this.#isLegacySweepDone()) return; this.#legacySweep = this.#sweepLegacyStatesImpl() .catch((error) => { getLogger(["fedify", "federation", "circuit"]).warn( @@ -489,10 +491,9 @@ export class CircuitBreaker { await this.#deleteIfUnchanged(markerKey, marker); throw error; } - await this.#kv.set( - markerKey, - this.#finishLegacySweepMarker(marker), - ); + const finishedMarker = this.#finishLegacySweepMarker(marker); + await this.#kv.set(markerKey, finishedMarker); + this.#rememberLegacySweepMarker(finishedMarker); } #finishLegacySweepMarker(marker: LegacySweepMarker): LegacySweepMarker { @@ -539,7 +540,10 @@ export class CircuitBreaker { ): Promise { while (true) { const marker = await this.#kv.get(markerKey); - if (isLegacySweepDone(marker, this.#now())) return "done"; + if (isLegacySweepDone(marker, this.#now())) { + this.#rememberLegacySweepMarker(marker); + return "done"; + } if (isLegacySweepInProgress(marker)) { return "done"; } @@ -565,6 +569,21 @@ export class CircuitBreaker { } } + #isLegacySweepDone(): boolean { + if (this.#legacySweepDone === "final") return true; + if (this.#legacySweepDone == null) return false; + if (Temporal.Instant.compare(this.#now(), this.#legacySweepDone) < 0) { + return true; + } + this.#legacySweepDone = undefined; + return false; + } + + #rememberLegacySweepMarker(marker: unknown): void { + const doneUntil = getLegacySweepDoneUntil(marker, this.#now()); + if (doneUntil != null) this.#legacySweepDone = doneUntil; + } + #capHeldRetryAt( now: Temporal.Instant, heldSince: Temporal.Instant | undefined, @@ -742,25 +761,31 @@ function isLegacySweepDone( value: unknown, now: Temporal.Instant, ): value is LegacySweepMarker { + return getLegacySweepDoneUntil(value, now) != null; +} + +function getLegacySweepDoneUntil( + value: unknown, + now: Temporal.Instant, +): Temporal.Instant | "final" | undefined { if (typeof value !== "object" || value == null || !("state" in value)) { - return false; + return undefined; } - if (value.state === "final") return true; + if (value.state === "final") return "final"; if ( value.state === "done" && "retryUntil" in value && typeof value.retryUntil === "string" ) { try { - return Temporal.Instant.compare( - now, - Temporal.Instant.from(value.retryUntil), - ) < - 0; + const retryUntil = Temporal.Instant.from(value.retryUntil); + return Temporal.Instant.compare(now, retryUntil) < 0 + ? retryUntil + : undefined; } catch { - return false; + return undefined; } } - return false; + return undefined; } function isLegacySweepRetryDue( From 7c66ba1874e26fd44a3e0db299f7c9315dbe9f1e Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 10:52:59 +0900 Subject: [PATCH 05/11] Harden sweep and TTL updates Keep legacy sweeps focused on host state keys and make their guarded CAS calls fail fast if a future refactor bypasses the CAS-store precondition. Refresh Postgres KV row timestamps on upsert so TTLs are measured from the latest write rather than the original row creation time. https://github.com/fedify-dev/fedify/pull/917#discussion_r3524137985 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524137989 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524142543 Assisted-by: Codex:gpt-5.5 --- .../src/federation/circuit-breaker.test.ts | 11 +++++ .../fedify/src/federation/circuit-breaker.ts | 7 ++-- packages/postgres/src/kv.test.ts | 40 +++++++++++++++++++ packages/postgres/src/kv.ts | 5 ++- 4 files changed, 59 insertions(+), 4 deletions(-) diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index f9213585e..f6b832f77 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -731,6 +731,10 @@ test("CircuitBreaker migrates legacy states without TTL once", async () => { }); await kv.set(["_fedify", "circuit", "malformed.example"], "stale"); await kv.set(["_fedify", "other", "untouched.example"], "keep"); + await kv.set(["_fedify", "circuit", "__fedify_meta", "future"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); const circuit = new CircuitBreaker({ kv, @@ -772,6 +776,13 @@ test("CircuitBreaker migrates legacy states without TTL once", async () => { await kv.get(["_fedify", "other", "untouched.example"]), "keep", ); + assertEquals( + await kv.get(["_fedify", "circuit", "__fedify_meta", "future"]), + { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }, + ); await circuit.recordFailure("remote.example"); assertEquals(await circuit.getState("remote.example"), { state: "closed", diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index a0a391f3b..f36678059 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -485,6 +485,7 @@ export class CircuitBreaker { try { for await (const { key, value } of this.#kv.list(this.#prefix)) { if (isEqualKvKey(key, markerKey)) continue; + if (key.length !== this.#prefix.length + 1) continue; await this.#migrateLegacyState(key, value); } } catch (error) { @@ -514,7 +515,7 @@ export class CircuitBreaker { if (isCurrentCircuitBreakerState(value)) return; const state = parseCircuitBreakerKvState(value); if (state != null) { - await this.#kv.cas?.( + await this.#kv.cas!( key, value, markCircuitBreakerState(state), @@ -527,7 +528,7 @@ export class CircuitBreaker { async #deleteIfUnchanged(key: KvKey, value: unknown): Promise { if ( - await this.#kv.cas?.(key, value, LEGACY_SWEEP_DELETING_MARKER, { + await this.#kv.cas!(key, value, LEGACY_SWEEP_DELETING_MARKER, { ttl: LEGACY_SWEEP_LOCK_TTL, }) ) { @@ -559,7 +560,7 @@ export class CircuitBreaker { retryUntil, } satisfies LegacySweepMarker; if ( - await this.#kv.cas?.(markerKey, marker ?? undefined, sweeping, { + await this.#kv.cas!(markerKey, marker ?? undefined, sweeping, { ttl: LEGACY_SWEEP_LOCK_TTL, }) ) { diff --git a/packages/postgres/src/kv.test.ts b/packages/postgres/src/kv.test.ts index aaee02954..8e80e1b97 100644 --- a/packages/postgres/src/kv.test.ts +++ b/packages/postgres/src/kv.test.ts @@ -107,6 +107,46 @@ test("PostgresKvStore.set()", { skip: dbUrl == null }, async () => { } }); +test( + "PostgresKvStore.set() refreshes TTL origin on update", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + + const { sql, tableName, store } = getStore(); + try { + await store.initialize(); + await sql` + INSERT INTO ${sql(tableName)} (key, value, created) + VALUES ( + ${["ttl", "origin"]}, + ${"stale"}, + CURRENT_TIMESTAMP - INTERVAL '2 days' + ) + `; + + await store.set(["ttl", "origin"], "fresh", { + ttl: Temporal.Duration.from({ days: 1 }), + }); + + assert.strictEqual(await store.get(["ttl", "origin"]), "fresh"); + const result = await sql` + SELECT created + FROM ${sql(tableName)} + WHERE key = ${["ttl", "origin"]} + `; + assert.strictEqual(result.length, 1); + assert( + result[0].created > new Date(Date.now() - 60_000), + "created timestamp should be refreshed on TTL update", + ); + } finally { + await store.drop(); + await sql.end(); + } + }, +); + test("PostgresKvStore.delete()", { skip: dbUrl == null }, async () => { if (dbUrl == null) return; // Bun does not support skip option diff --git a/packages/postgres/src/kv.ts b/packages/postgres/src/kv.ts index e289bca3a..13cad9511 100644 --- a/packages/postgres/src/kv.ts +++ b/packages/postgres/src/kv.ts @@ -98,7 +98,10 @@ export class PostgresKvStore implements KvStore { ${ttl} ) ON CONFLICT (key) - DO UPDATE SET value = EXCLUDED.value, ttl = EXCLUDED.ttl; + DO UPDATE SET + value = EXCLUDED.value, + created = CURRENT_TIMESTAMP, + ttl = EXCLUDED.ttl; `; await this.#expire(); } From 9afca89d57a11c5847da544453341375ead489c3 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 11:02:58 +0900 Subject: [PATCH 06/11] Harden circuit sweep expiry Let stale legacy sweep markers be replaced after the lock TTL so a crashed or failed worker cannot block future cleanup forever. Extend the default state TTL to cover held messages created near the end of the recovery delay. https://github.com/fedify-dev/fedify/pull/917#discussion_r3524153731 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524153737 https://github.com/fedify-dev/fedify/pull/917#discussion_r3524158679 Assisted-by: Codex:gpt-5.5 --- .../src/federation/circuit-breaker.test.ts | 47 ++++++++++- .../fedify/src/federation/circuit-breaker.ts | 78 +++++++++++++++---- 2 files changed, 109 insertions(+), 16 deletions(-) diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index f6b832f77..a8f9dfae6 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -655,7 +655,7 @@ test("CircuitBreaker writes stored states with a TTL", async () => { await circuit.recordFailure("remote.example"); assertEquals(kv.options.at(-1), { - ttl: Temporal.Duration.from({ days: 2 }), + ttl: Temporal.Duration.from({ days: 2, hours: 1 }), }); }); @@ -677,7 +677,7 @@ test("CircuitBreaker writes stored states with a TTL without CAS", async () => { assertEquals(kv.keys, [["_fedify", "circuit", "remote.example"]]); assertEquals(kv.options.at(-1), { - ttl: Temporal.Duration.from({ days: 2 }), + ttl: Temporal.Duration.from({ days: 2, hours: 1 }), }); }); @@ -901,6 +901,49 @@ test("CircuitBreaker skips legacy sweep already running elsewhere", async () => }); }); +test("CircuitBreaker retries expired legacy sweep markers", async () => { + const kv = new CountingSweepKvStore(); + await kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "sweeping", + started: "2026-05-25T00:00:00Z", + retryUntil: "2026-06-01T00:00:00Z", + }); + await kv.set(["_fedify", "circuit", "stale.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:06:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + + assertEquals(kv.listCalls, 1); + assertEquals(await kv.get(["_fedify", "circuit", "stale.example"]), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + __fedifyCircuitBreakerStateVersion: 1, + }); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "done", retryUntil: "2026-06-01T00:00:00Z" }, + ); +}); + test("CircuitBreaker caches completed legacy sweep markers", async () => { const doneKv = new CountingSweepKvStore(); await doneKv.set([ diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index f36678059..2cbb37cc8 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -498,15 +498,16 @@ export class CircuitBreaker { } #finishLegacySweepMarker(marker: LegacySweepMarker): LegacySweepMarker { + const retryUntil = getLegacySweepRetryUntil(marker); if ( - "retryUntil" in marker && + retryUntil != null && Temporal.Instant.compare( this.#now(), - Temporal.Instant.from(marker.retryUntil), + Temporal.Instant.from(retryUntil), ) < 0 ) { - return { state: "done", retryUntil: marker.retryUntil }; + return { state: "done", retryUntil }; } return { state: "final" }; } @@ -541,22 +542,24 @@ export class CircuitBreaker { ): Promise { while (true) { const marker = await this.#kv.get(markerKey); - if (isLegacySweepDone(marker, this.#now())) { + const now = this.#now(); + if (isLegacySweepActive(marker, now)) { this.#rememberLegacySweepMarker(marker); return "done"; } - if (isLegacySweepInProgress(marker)) { - return "done"; - } - if (marker != null && !isLegacySweepRetryDue(marker, this.#now())) { + if ( + marker != null && !isLegacySweepRetryDue(marker, now) && + !isLegacySweepStale(marker, now) + ) { return "done"; } - const retryUntil = isLegacySweepRetryDue(marker, this.#now()) + const retryUntil = isLegacySweepRetryDue(marker, now) ? marker.retryUntil - : this.#now().add(LEGACY_SWEEP_RETRY_WINDOW).toString(); + : getLegacySweepRetryUntil(marker) ?? + now.add(LEGACY_SWEEP_RETRY_WINDOW).toString(); const sweeping = { state: "sweeping", - started: this.#now().toString(), + started: now.toString(), retryUntil, } satisfies LegacySweepMarker; if ( @@ -723,7 +726,10 @@ export function normalizeCircuitBreakerOptions( return Temporal.Duration.compare(first.until(last), failureWindow) <= 0; }; stateTtl = configuredStateTtl ?? - maxDuration(recoveryDelay.add(failureWindow), heldActivityTtl); + maxDuration( + recoveryDelay.add(failureWindow), + recoveryDelay.add(heldActivityTtl), + ); } else { failure = options.failure; pruneFailures = (timestamps) => @@ -810,9 +816,53 @@ function isLegacySweepRetryDue( } } -function isLegacySweepInProgress(value: unknown): boolean { +function isLegacySweepActive(value: unknown, now: Temporal.Instant): boolean { + return isLegacySweepDone(value, now) || isLegacySweepInProgress(value, now); +} + +function isLegacySweepInProgress( + value: unknown, + now: Temporal.Instant, +): boolean { + if ( + typeof value !== "object" || value == null || + !("state" in value) || value.state !== "sweeping" || + !("started" in value) || typeof value.started !== "string" + ) { + return false; + } + try { + return Temporal.Instant.compare( + now, + Temporal.Instant.from(value.started).add(LEGACY_SWEEP_LOCK_TTL), + ) < 0; + } catch { + return false; + } +} + +function isLegacySweepStale( + value: unknown, + now: Temporal.Instant, +): value is Extract { return typeof value === "object" && value != null && - "state" in value && value.state === "sweeping"; + "state" in value && value.state === "sweeping" && + !isLegacySweepInProgress(value, now); +} + +function getLegacySweepRetryUntil(value: unknown): string | undefined { + if ( + typeof value !== "object" || value == null || + !("retryUntil" in value) || typeof value.retryUntil !== "string" + ) { + return undefined; + } + try { + Temporal.Instant.from(value.retryUntil); + return value.retryUntil; + } catch { + return undefined; + } } function isCurrentCircuitBreakerState(value: unknown): boolean { From d76e21daf9209940741a1c5d89869c9945a0d7a2 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 11:10:46 +0900 Subject: [PATCH 07/11] Fix circuit TTL test expectation Update the normalized circuit breaker option test to match the derived state TTL after including the recovery delay on the held-activity side of the calculation. https://github.com/fedify-dev/fedify/pull/917#discussion_r3524168362 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/circuit-breaker.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index a8f9dfae6..3ba9d78a8 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -196,7 +196,7 @@ test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { ); assertEquals( options.stateTtl, - Temporal.Duration.from({ hours: 2, minutes: 10 }), + Temporal.Duration.from({ hours: 3 }), ); }); From 1a352bf84b1c5eb0f6b624a96292697c89595ba6 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 23:00:28 +0900 Subject: [PATCH 08/11] Retry malformed circuit sweep markers Treat malformed legacy sweep markers as replaceable migration metadata instead of a completed sweep. This lets the cleanup path recover if a bad marker is left in the KV store and prevents old no-TTL state from being kept forever. https://github.com/fedify-dev/fedify/pull/917#pullrequestreview-4631302038 Assisted-by: Codex:gpt-5.5 --- .../src/federation/circuit-breaker.test.ts | 22 +++++++++++++++++-- .../fedify/src/federation/circuit-breaker.ts | 13 ++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index 3ba9d78a8..e055a66b6 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -993,7 +993,7 @@ test("CircuitBreaker caches completed legacy sweep markers", async () => { assertEquals(finalKv.markerGetCalls, 1); }); -test("CircuitBreaker ignores malformed legacy sweep retry markers", async () => { +test("CircuitBreaker retries malformed legacy sweep markers", async () => { const kv = new CountingSweepKvStore(); await kv.set([ "_fedify", @@ -1004,6 +1004,10 @@ test("CircuitBreaker ignores malformed legacy sweep retry markers", async () => state: "done", retryUntil: "not an instant", }); + await kv.set(["_fedify", "circuit", "stale.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); const circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -1014,11 +1018,25 @@ test("CircuitBreaker ignores malformed legacy sweep retry markers", async () => await circuit.recordFailure("remote.example"); await circuit.pendingSweep; - assertEquals(kv.listCalls, 0); + assertEquals(kv.listCalls, 1); + assertEquals(await kv.get(["_fedify", "circuit", "stale.example"]), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + __fedifyCircuitBreakerStateVersion: 1, + }); assertEquals(await circuit.getState("remote.example"), { state: "closed", failures: ["2026-05-25T00:00:00Z"], }); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "done", retryUntil: "2026-06-01T00:00:00Z" }, + ); }); test("CircuitBreaker retries legacy sweep after transient failures", async () => { diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index 2cbb37cc8..e458b1d07 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -548,7 +548,8 @@ export class CircuitBreaker { return "done"; } if ( - marker != null && !isLegacySweepRetryDue(marker, now) && + isLegacySweepMarker(marker, now) && + !isLegacySweepRetryDue(marker, now) && !isLegacySweepStale(marker, now) ) { return "done"; @@ -820,6 +821,16 @@ function isLegacySweepActive(value: unknown, now: Temporal.Instant): boolean { return isLegacySweepDone(value, now) || isLegacySweepInProgress(value, now); } +function isLegacySweepMarker( + value: unknown, + now: Temporal.Instant, +): value is LegacySweepMarker { + return isLegacySweepDone(value, now) || + isLegacySweepRetryDue(value, now) || + isLegacySweepInProgress(value, now) || + isLegacySweepStale(value, now); +} + function isLegacySweepInProgress( value: unknown, now: Temporal.Instant, From e77306d70c2e91037ca78a11400033a3e04bb674 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sun, 5 Jul 2026 23:10:37 +0900 Subject: [PATCH 09/11] Clean up circuit review follow-ups Normalize Postgres KV TTL values before storing them so DurationLike objects do not become invalid interval strings. Remove the unreachable legacy sweep marker branch, and document how stateTtl controls stored circuit state retention for default and custom failure policies. https://github.com/fedify-dev/fedify/pull/917#discussion_r3525025105 https://github.com/fedify-dev/fedify/pull/917#discussion_r3525025108 https://github.com/fedify-dev/fedify/pull/917#discussion_r3525029380 Assisted-by: Codex:gpt-5.5 --- docs/manual/circuit-breaker.md | 22 ++++++++++++++++ .../fedify/src/federation/circuit-breaker.ts | 26 ------------------- packages/postgres/src/kv.test.ts | 10 +++++++ packages/postgres/src/kv.ts | 4 ++- 4 files changed, 35 insertions(+), 27 deletions(-) diff --git a/docs/manual/circuit-breaker.md b/docs/manual/circuit-breaker.md index f0dd0be3c..5090047b3 100644 --- a/docs/manual/circuit-breaker.md +++ b/docs/manual/circuit-breaker.md @@ -154,6 +154,28 @@ For multi-worker deployments, use a `KvStore` implementation that supports Fedify still works without CAS, but it logs a warning because concurrent workers can race when opening or closing the same host's circuit. +Fedify expires stored circuit state after it is no longer useful. With the +default numeric failure policy, the default `stateTtl` is derived from +`failureWindow`, `recoveryDelay`, and `heldActivityTtl` so failure history, +recovery probes, and held activities all have enough time to complete. If you +provide a custom `failure` callback, Fedify cannot infer how long your policy +needs its timestamp history, so stored state does not expire by default. Set +`stateTtl` explicitly when using a custom policy and you want circuit state to +be cleaned up automatically: + +~~~~ typescript +const federation = createFederation({ + kv, + queue, + circuitBreaker: { + failure(timestamps) { + return timestamps.length >= 10; + }, + stateTtl: { days: 14 }, + }, +}); +~~~~ + Observability ------------- diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index e458b1d07..5ec61a0fe 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -547,13 +547,6 @@ export class CircuitBreaker { this.#rememberLegacySweepMarker(marker); return "done"; } - if ( - isLegacySweepMarker(marker, now) && - !isLegacySweepRetryDue(marker, now) && - !isLegacySweepStale(marker, now) - ) { - return "done"; - } const retryUntil = isLegacySweepRetryDue(marker, now) ? marker.retryUntil : getLegacySweepRetryUntil(marker) ?? @@ -821,16 +814,6 @@ function isLegacySweepActive(value: unknown, now: Temporal.Instant): boolean { return isLegacySweepDone(value, now) || isLegacySweepInProgress(value, now); } -function isLegacySweepMarker( - value: unknown, - now: Temporal.Instant, -): value is LegacySweepMarker { - return isLegacySweepDone(value, now) || - isLegacySweepRetryDue(value, now) || - isLegacySweepInProgress(value, now) || - isLegacySweepStale(value, now); -} - function isLegacySweepInProgress( value: unknown, now: Temporal.Instant, @@ -852,15 +835,6 @@ function isLegacySweepInProgress( } } -function isLegacySweepStale( - value: unknown, - now: Temporal.Instant, -): value is Extract { - return typeof value === "object" && value != null && - "state" in value && value.state === "sweeping" && - !isLegacySweepInProgress(value, now); -} - function getLegacySweepRetryUntil(value: unknown): string | undefined { if ( typeof value !== "object" || value == null || diff --git a/packages/postgres/src/kv.test.ts b/packages/postgres/src/kv.test.ts index 8e80e1b97..d79b8acd1 100644 --- a/packages/postgres/src/kv.test.ts +++ b/packages/postgres/src/kv.test.ts @@ -92,6 +92,16 @@ test("PostgresKvStore.set()", { skip: dbUrl == null }, async () => { assert.strictEqual(result2[0].value, "qux"); assert.strictEqual(result2[0].ttl, "1 day"); + await store.set(["foo", "duration-like"], "duration-like", { + ttl: { hours: 1 } as Temporal.Duration, + }); + const durationLikeResult = await sql` + SELECT * FROM ${sql(tableName)} + WHERE key = ${["foo", "duration-like"]} + `; + assert.strictEqual(durationLikeResult.length, 1); + assert.strictEqual(durationLikeResult[0].ttl, "01:00:00"); + await store.set(["foo", "quux"], true); const result3 = await sql` SELECT * FROM ${sql(tableName)} diff --git a/packages/postgres/src/kv.ts b/packages/postgres/src/kv.ts index 13cad9511..d29a9d070 100644 --- a/packages/postgres/src/kv.ts +++ b/packages/postgres/src/kv.ts @@ -89,7 +89,9 @@ export class PostgresKvStore implements KvStore { options?: KvStoreSetOptions | undefined, ): Promise { await this.initialize(); - const ttl = options?.ttl == null ? null : options.ttl.toString(); + const ttl = options?.ttl == null + ? null + : Temporal.Duration.from(options.ttl).toString(); await this.#sql` INSERT INTO ${this.#sql(this.#tableName)} (key, value, ttl) VALUES ( From ebf154c86ebd11ad2476528d6d60b6c67e62cd96 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 6 Jul 2026 01:18:21 +0900 Subject: [PATCH 10/11] Harden legacy circuit sweeps Legacy circuit sweeps are best-effort cleanup, so they should not create extra load or overwrite marker state when they cannot reclaim leaked data. Skip the sweep for custom failure policies without a state TTL, finish the sweep marker with CAS, and bound marker acquisition retries. https://github.com/fedify-dev/fedify/pull/917#issuecomment-4886666958 Assisted-by: Codex:gpt-5.5 --- .../src/federation/circuit-breaker.test.ts | 106 ++++++++++++++++++ .../fedify/src/federation/circuit-breaker.ts | 10 +- 2 files changed, 113 insertions(+), 3 deletions(-) diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index e055a66b6..81bcc7ac3 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -118,6 +118,44 @@ class CountingSweepKvStore extends MemoryKvStore { } } +class ConflictingSweepMarkerKvStore extends CountingSweepKvStore { + attempts = 0; + + override cas( + key: KvKey, + expectedValue: unknown, + newValue: unknown, + options?: KvStoreSetOptions, + ): Promise { + if ( + key.length === 4 && + key[0] === "_fedify" && + key[1] === "circuit" && + key[2] === "__fedify_meta" && + key[3] === "circuit_breaker_state_ttl_sweep_v1" + ) { + this.attempts++; + if (this.attempts > 3) { + throw new Error("legacy sweep did not stop retrying CAS misses"); + } + return Promise.resolve(false); + } + return super.cas(key, expectedValue, newValue, options); + } +} + +class MarkerChangedDuringListKvStore extends CountingSweepKvStore { + override async *list(prefix?: KvKey) { + yield* super.list(prefix); + await super.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { state: "final" }); + } +} + class UpdatingDuringListKvStore extends MemoryKvStore { override async *list(prefix?: KvKey) { for await (const entry of super.list(prefix)) { @@ -718,6 +756,31 @@ test("CircuitBreaker accepts an explicit TTL with custom failure policies", asyn }); }); +test("CircuitBreaker skips legacy sweep when custom policies have no TTL", async () => { + const kv = new CountingSweepKvStore(); + await kv.set(["_fedify", "circuit", "stale.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failure: (timestamps) => timestamps.length >= 2, + }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + + assertEquals(kv.listCalls, 0); + assertEquals(await kv.get(["_fedify", "circuit", "stale.example"]), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); +}); + test("CircuitBreaker migrates legacy states without TTL once", async () => { const kv = new CountingSweepKvStore(); await kv.set(["_fedify", "circuit", "stale-a.example"], { @@ -901,6 +964,49 @@ test("CircuitBreaker skips legacy sweep already running elsewhere", async () => }); }); +test("CircuitBreaker does not overwrite a changed legacy sweep marker", async () => { + const kv = new MarkerChangedDuringListKvStore(); + await kv.set(["_fedify", "circuit", "stale.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "final" }, + ); +}); + +test("CircuitBreaker bounds legacy sweep CAS retries", async () => { + const kv = new ConflictingSweepMarkerKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.getState("remote.example"); + await circuit.pendingSweep; + + assertEquals(kv.attempts, 3); + assertEquals(kv.listCalls, 0); +}); + test("CircuitBreaker retries expired legacy sweep markers", async () => { const kv = new CountingSweepKvStore(); await kv.set([ diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index 5ec61a0fe..c4e28a946 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -150,6 +150,7 @@ const CIRCUIT_BREAKER_STATE_VERSION = 1; const LEGACY_SWEEP_LOCK_TTL = Temporal.Duration.from({ minutes: 5 }); const LEGACY_SWEEP_RETRY_WINDOW = Temporal.Duration.from({ hours: 24 * 7 }); const LEGACY_SWEEP_WAIT_INTERVAL = 10; +const LEGACY_SWEEP_CAS_ATTEMPTS = 3; type LegacySweepMarker = | { @@ -464,6 +465,7 @@ export class CircuitBreaker { #sweepLegacyStates(): void { if (this.#kv.cas == null) return; + if (this.#options.stateTtl == null) return; if (this.#legacySweep != null) return; if (this.#isLegacySweepDone()) return; this.#legacySweep = this.#sweepLegacyStatesImpl() @@ -493,8 +495,9 @@ export class CircuitBreaker { throw error; } const finishedMarker = this.#finishLegacySweepMarker(marker); - await this.#kv.set(markerKey, finishedMarker); - this.#rememberLegacySweepMarker(finishedMarker); + if (await this.#kv.cas!(markerKey, marker, finishedMarker)) { + this.#rememberLegacySweepMarker(finishedMarker); + } } #finishLegacySweepMarker(marker: LegacySweepMarker): LegacySweepMarker { @@ -540,7 +543,7 @@ export class CircuitBreaker { async #acquireLegacySweep( markerKey: KvKey, ): Promise { - while (true) { + for (let attempt = 0; attempt < LEGACY_SWEEP_CAS_ATTEMPTS; attempt++) { const marker = await this.#kv.get(markerKey); const now = this.#now(); if (isLegacySweepActive(marker, now)) { @@ -565,6 +568,7 @@ export class CircuitBreaker { } await delay(LEGACY_SWEEP_WAIT_INTERVAL); } + return "done"; } #isLegacySweepDone(): boolean { From efd6d014f9369f72c9018f667dcf6d98b3bef385 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Mon, 6 Jul 2026 03:27:07 +0900 Subject: [PATCH 11/11] Clarify circuit sweep limits The automatic legacy sweep only applies when the configured key-value store supports CAS, so document that Postgres and Redis users need manual cleanup for circuit state written without TTLs by earlier 2.3 releases. Also remove the redundant marker-key check from the sweep loop. https://github.com/fedify-dev/fedify/pull/917#issuecomment-4886772029 Assisted-by: Codex:gpt-5.5 --- docs/manual/circuit-breaker.md | 8 ++++++++ packages/fedify/src/federation/circuit-breaker.ts | 6 ------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/manual/circuit-breaker.md b/docs/manual/circuit-breaker.md index 5090047b3..e602d7448 100644 --- a/docs/manual/circuit-breaker.md +++ b/docs/manual/circuit-breaker.md @@ -176,6 +176,14 @@ const federation = createFederation({ }); ~~~~ +When upgrading from Fedify 2.3.0 or 2.3.1, Fedify automatically rewrites +legacy circuit state with a TTL only on `KvStore` implementations that support +`cas()`. Stores without CAS, including `@fedify/postgres` and +`@fedify/redis`, apply TTLs to new circuit state but cannot automatically +clean up circuit state that was already written without a TTL. If that old +state matters for your deployment, remove it with a one-off cleanup script or +add a TTL directly in your storage backend. + Observability ------------- diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index c4e28a946..0b07e0c48 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -486,7 +486,6 @@ export class CircuitBreaker { if (marker === "done") return; try { for await (const { key, value } of this.#kv.list(this.#prefix)) { - if (isEqualKvKey(key, markerKey)) continue; if (key.length !== this.#prefix.length + 1) continue; await this.#migrateLegacyState(key, value); } @@ -757,11 +756,6 @@ function maxDuration( ); } -function isEqualKvKey(left: KvKey, right: KvKey): boolean { - return left.length === right.length && - left.every((part, index) => part === right[index]); -} - function isLegacySweepDone( value: unknown, now: Temporal.Instant,