diff --git a/CHANGES.md b/CHANGES.md index 8f3619777..5c2d43b14 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,19 @@ Version 2.3.2 To be released. +### @fedify/fedify + + - Fixed the outbound delivery circuit breaker retaining per-host state in the + configured key–value store forever when a remote host never recovered. + Circuit breaker state now receives a TTL on writes made with the default + failure policy, custom failure policies can opt in with the new `stateTtl` + option, and stale state written by earlier 2.3 releases is cleared + automatically on CAS-backed stores after upgrade, with another sweep after a + grace window to cover rolling deployments. [[#916], [#917]] + +[#916]: https://github.com/fedify-dev/fedify/issues/916 +[#917]: https://github.com/fedify-dev/fedify/pull/917 + Version 2.3.1 ------------- diff --git a/packages/fedify/src/federation/circuit-breaker-test-utils.ts b/packages/fedify/src/federation/circuit-breaker-test-utils.ts new file mode 100644 index 000000000..13fcb0e11 --- /dev/null +++ b/packages/fedify/src/federation/circuit-breaker-test-utils.ts @@ -0,0 +1,12 @@ +import type { KvStore } from "./kv.ts"; + +export function markCircuitBreakerLegacySweepDone( + kv: Pick, +): Promise { + return kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { state: "final" }); +} diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts index f1d8cc3cd..3ba9d78a8 100644 --- a/packages/fedify/src/federation/circuit-breaker.test.ts +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -5,7 +5,14 @@ import { normalizeCircuitBreakerOptions, parseCircuitBreakerKvState, } from "./circuit-breaker.ts"; -import { type KvKey, type KvStoreSetOptions, MemoryKvStore } from "./kv.ts"; +import { + type KvKey, + type KvStore, + type KvStoreListEntry, + type KvStoreSetOptions, + MemoryKvStore, +} from "./kv.ts"; +import { markCircuitBreakerLegacySweepDone } from "./circuit-breaker-test-utils.ts"; class AlwaysConflictingKvStore extends MemoryKvStore { attempts = 0; @@ -26,6 +33,7 @@ class AlwaysConflictingKvStore extends MemoryKvStore { class CountingCasKvStore extends MemoryKvStore { attempts = 0; + options: (KvStoreSetOptions | undefined)[] = []; override cas( key: KvKey, @@ -34,14 +42,125 @@ class CountingCasKvStore extends MemoryKvStore { options?: KvStoreSetOptions, ): Promise { this.attempts++; + this.options.push(options); + return super.cas(key, expectedValue, newValue, options); + } +} + +class CountingSetKvStore implements KvStore { + #store = new MemoryKvStore(); + keys: KvKey[] = []; + options: (KvStoreSetOptions | undefined)[] = []; + + get(key: KvKey): Promise { + return this.#store.get(key); + } + + set( + key: KvKey, + value: unknown, + options?: KvStoreSetOptions, + ): Promise { + this.keys.push(key); + this.options.push(options); + return this.#store.set(key, value, options); + } + + delete(key: KvKey): Promise { + return this.#store.delete(key); + } + + list(prefix?: KvKey): AsyncIterable { + return this.#store.list(prefix); + } +} + +class CountingSweepKvStore extends MemoryKvStore { + listCalls = 0; + markerGetCalls = 0; + casCalls: { + key: KvKey; + options: KvStoreSetOptions | undefined; + }[] = []; + deletedKeys: KvKey[] = []; + + override get(key: KvKey): Promise { + if ( + key.length === 4 && + key[0] === "_fedify" && + key[1] === "circuit" && + key[2] === "__fedify_meta" && + key[3] === "circuit_breaker_state_ttl_sweep_v1" + ) { + this.markerGetCalls++; + } + return super.get(key); + } + + override cas( + key: KvKey, + expectedValue: unknown, + newValue: unknown, + options?: KvStoreSetOptions, + ): Promise { + this.casCalls.push({ key, options }); return super.cas(key, expectedValue, newValue, options); } + + override async *list(prefix?: KvKey) { + this.listCalls++; + yield* super.list(prefix); + } + + override delete(key: KvKey): Promise { + this.deletedKeys.push(key); + return super.delete(key); + } +} + +class UpdatingDuringListKvStore extends MemoryKvStore { + override async *list(prefix?: KvKey) { + for await (const entry of super.list(prefix)) { + if ( + entry.key.length === 3 && + entry.key[0] === "_fedify" && + entry.key[1] === "circuit" && + entry.key[2] === "racy.example" + ) { + await super.set(entry.key, { + state: "closed", + failures: ["2026-05-25T00:01:00Z"], + }); + } + yield entry; + } + } +} + +class FailingOnceSweepKvStore extends MemoryKvStore { + listCalls = 0; + deletedKeys: KvKey[] = []; + + override async *list(prefix?: KvKey) { + this.listCalls++; + if (this.listCalls === 1) { + throw new Error("transient list failure"); + } + yield* super.list(prefix); + } + + override delete(key: KvKey): Promise { + this.deletedKeys.push(key); + return super.delete(key); + } } test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { const options = normalizeCircuitBreakerOptions({ failureThreshold: 3, failureWindow: { minutes: 10 }, + heldActivityTtl: { hours: 1 }, + recoveryDelay: { hours: 2 }, }); const failures = [ Temporal.Instant.from("2026-05-25T00:00:00Z"), @@ -75,6 +194,10 @@ test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { "2026-05-25T00:12:00Z", ], ); + assertEquals( + options.stateTtl, + Temporal.Duration.from({ hours: 3 }), + ); }); test("normalizeCircuitBreakerOptions() validates numeric failure policy", () => { @@ -167,6 +290,7 @@ test("parseCircuitBreakerKvState() validates stored shape", () => { failures: ["2026-05-25T00:00:00Z"], opened: "2026-05-25T00:00:00Z", halfOpened: "2026-05-25T00:00:00Z", + __fedifyCircuitBreakerStateVersion: 1, }), { state: "open", @@ -298,6 +422,7 @@ test("CircuitBreaker opens, probes, closes, and drops held activities", async () test("CircuitBreaker recovers stale half-open probes", async () => { const kv = new MemoryKvStore(); + await markCircuitBreakerLegacySweepDone(kv); let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); const circuit = new CircuitBreaker({ kv, @@ -343,6 +468,7 @@ test("CircuitBreaker recovers stale half-open probes", async () => { test("CircuitBreaker caps held delays at activity TTL", async () => { const kv = new MemoryKvStore(); + await markCircuitBreakerLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); const circuit = new CircuitBreaker({ kv, @@ -402,6 +528,7 @@ test("CircuitBreaker caps held delays at activity TTL", async () => { test("CircuitBreaker ignores malformed held timestamps", async () => { const kv = new MemoryKvStore(); + await markCircuitBreakerLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); const circuit = new CircuitBreaker({ kv, @@ -430,6 +557,7 @@ test("CircuitBreaker ignores malformed held timestamps", async () => { test("CircuitBreaker bounds beforeSend CAS retries", async () => { let kv = new AlwaysConflictingKvStore(); + await markCircuitBreakerLegacySweepDone(kv); const now = Temporal.Instant.from("2026-05-25T00:30:00Z"); let circuit = new CircuitBreaker({ kv, @@ -456,6 +584,7 @@ test("CircuitBreaker bounds beforeSend CAS retries", async () => { }); kv = new AlwaysConflictingKvStore(); + await markCircuitBreakerLegacySweepDone(kv); circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -484,6 +613,7 @@ test("CircuitBreaker bounds beforeSend CAS retries", async () => { test("CircuitBreaker skips recording failures for open circuits", async () => { const kv = new CountingCasKvStore(); + await markCircuitBreakerLegacySweepDone(kv); const circuit = new CircuitBreaker({ kv, prefix: ["_fedify", "circuit"], @@ -507,6 +637,430 @@ test("CircuitBreaker skips recording failures for open circuits", async () => { ); }); +test("CircuitBreaker writes stored states with a TTL", async () => { + const kv = new CountingCasKvStore(); + await markCircuitBreakerLegacySweepDone(kv); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failureThreshold: 2, + failureWindow: { hours: 3 }, + recoveryDelay: { hours: 1 }, + heldActivityTtl: { days: 2 }, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.options.at(-1), { + ttl: Temporal.Duration.from({ days: 2, hours: 1 }), + }); +}); + +test("CircuitBreaker writes stored states with a TTL without CAS", async () => { + const kv = new CountingSetKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failureThreshold: 2, + failureWindow: { hours: 3 }, + recoveryDelay: { hours: 1 }, + heldActivityTtl: { days: 2 }, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.keys, [["_fedify", "circuit", "remote.example"]]); + assertEquals(kv.options.at(-1), { + ttl: Temporal.Duration.from({ days: 2, hours: 1 }), + }); +}); + +test("CircuitBreaker preserves custom failure history by default", async () => { + const kv = new CountingCasKvStore(); + await markCircuitBreakerLegacySweepDone(kv); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failure: (timestamps) => timestamps.length >= 2, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.options.at(-1), undefined); +}); + +test("CircuitBreaker accepts an explicit TTL with custom failure policies", async () => { + const kv = new CountingCasKvStore(); + await markCircuitBreakerLegacySweepDone(kv); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { + failure: (timestamps) => timestamps.length >= 2, + stateTtl: { days: 14 }, + }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.options.at(-1), { + ttl: Temporal.Duration.from({ days: 14 }), + }); +}); + +test("CircuitBreaker migrates legacy states without TTL once", async () => { + const kv = new CountingSweepKvStore(); + await kv.set(["_fedify", "circuit", "stale-a.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + await kv.set(["_fedify", "circuit", "stale-b.example"], { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }); + await kv.set(["_fedify", "circuit", "malformed.example"], "stale"); + await kv.set(["_fedify", "other", "untouched.example"], "keep"); + await kv.set(["_fedify", "circuit", "__fedify_meta", "future"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { failureThreshold: 2 }, + }); + + assertEquals(await circuit.beforeSend("stale-b.example", {}), { + type: "hold", + state: "open", + delay: Temporal.Duration.from({ minutes: 30 }), + heldSince: Temporal.Instant.from("2026-05-25T00:00:00Z"), + }); + await circuit.pendingSweep; + + assertEquals( + await kv.get(["_fedify", "circuit", "stale-a.example"]), + { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + __fedifyCircuitBreakerStateVersion: 1, + }, + ); + assertEquals( + await kv.get(["_fedify", "circuit", "stale-b.example"]), + { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + __fedifyCircuitBreakerStateVersion: 1, + }, + ); + assertEquals( + await kv.get(["_fedify", "circuit", "malformed.example"]), + undefined, + ); + assertEquals( + await kv.get(["_fedify", "other", "untouched.example"]), + "keep", + ); + assertEquals( + await kv.get(["_fedify", "circuit", "__fedify_meta", "future"]), + { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }, + ); + await circuit.recordFailure("remote.example"); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + assertEquals(kv.listCalls, 1); + assertEquals( + kv.casCalls.find(({ key }) => + key.length === 3 && + key[0] === "_fedify" && + key[1] === "circuit" && + key[2] === "stale-a.example" + )?.options, + { ttl: Temporal.Duration.from({ days: 7 }) }, + ); + assertEquals(kv.deletedKeys, [ + ["_fedify", "circuit", "malformed.example"], + ]); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "done", retryUntil: "2026-06-01T00:00:00Z" }, + ); + + await kv.set(["_fedify", "circuit", "late-stale.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + await circuit.recordFailure("another.example"); + + assertEquals(await kv.get(["_fedify", "circuit", "late-stale.example"]), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + assertEquals(kv.listCalls, 1); + + now = Temporal.Instant.from("2026-06-02T00:00:00Z"); + await circuit.recordFailure("final.example"); + await circuit.pendingSweep; + + assertEquals( + await kv.get(["_fedify", "circuit", "late-stale.example"]), + { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + __fedifyCircuitBreakerStateVersion: 1, + }, + ); + assertEquals(await circuit.getState("another.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + assertEquals(kv.listCalls, 2); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "final" }, + ); +}); + +test("CircuitBreaker does not sweep state changed after listing", async () => { + const kv = new UpdatingDuringListKvStore(); + await kv.set(["_fedify", "circuit", "racy.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:02:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + + assertEquals(await kv.get(["_fedify", "circuit", "racy.example"]), { + state: "closed", + failures: ["2026-05-25T00:01:00Z"], + }); +}); + +test("CircuitBreaker skips legacy sweep already running elsewhere", async () => { + const kv = new CountingSweepKvStore(); + await kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "sweeping", + started: "2026-05-25T00:00:00Z", + retryUntil: "2026-06-01T00:00:00Z", + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + + assertEquals(kv.listCalls, 0); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); +}); + +test("CircuitBreaker retries expired legacy sweep markers", async () => { + const kv = new CountingSweepKvStore(); + await kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "sweeping", + started: "2026-05-25T00:00:00Z", + retryUntil: "2026-06-01T00:00:00Z", + }); + await kv.set(["_fedify", "circuit", "stale.example"], { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:06:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + + assertEquals(kv.listCalls, 1); + assertEquals(await kv.get(["_fedify", "circuit", "stale.example"]), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + __fedifyCircuitBreakerStateVersion: 1, + }); + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + { state: "done", retryUntil: "2026-06-01T00:00:00Z" }, + ); +}); + +test("CircuitBreaker caches completed legacy sweep markers", async () => { + const doneKv = new CountingSweepKvStore(); + await doneKv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "done", + retryUntil: "2026-06-01T00:00:00Z", + }); + let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const doneCircuit = new CircuitBreaker({ + kv: doneKv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { failureThreshold: 2 }, + }); + + await doneCircuit.recordFailure("remote.example"); + await doneCircuit.pendingSweep; + await doneCircuit.recordFailure("another.example"); + await doneCircuit.pendingSweep; + + assertEquals(doneKv.markerGetCalls, 1); + + now = Temporal.Instant.from("2026-06-02T00:00:00Z"); + await doneCircuit.recordFailure("final.example"); + await doneCircuit.pendingSweep; + + assertEquals(doneKv.markerGetCalls, 2); + + const finalKv = new CountingSweepKvStore(); + await markCircuitBreakerLegacySweepDone(finalKv); + const finalCircuit = new CircuitBreaker({ + kv: finalKv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await finalCircuit.recordFailure("remote.example"); + await finalCircuit.pendingSweep; + await finalCircuit.recordFailure("another.example"); + await finalCircuit.pendingSweep; + + assertEquals(finalKv.markerGetCalls, 1); +}); + +test("CircuitBreaker ignores malformed legacy sweep retry markers", async () => { + const kv = new CountingSweepKvStore(); + await kv.set([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ], { + state: "done", + retryUntil: "not an instant", + }); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + + assertEquals(kv.listCalls, 0); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); +}); + +test("CircuitBreaker retries legacy sweep after transient failures", async () => { + const kv = new FailingOnceSweepKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:00:00Z"), + options: { failureThreshold: 2 }, + }); + + await circuit.recordFailure("remote.example"); + await circuit.pendingSweep; + assertEquals( + await kv.get([ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]), + undefined, + ); + assertEquals(kv.deletedKeys, [[ + "_fedify", + "circuit", + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", + ]]); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + + await circuit.recordFailure("another.example"); + await circuit.pendingSweep; + assertEquals(kv.listCalls, 2); + assertEquals(await circuit.getState("another.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); +}); + test("CircuitBreaker prunes stale closed failure history", async () => { const kv = new MemoryKvStore(); let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts index 4fd13959a..2cbb37cc8 100644 --- a/packages/fedify/src/federation/circuit-breaker.ts +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -19,6 +19,10 @@ export interface CircuitBreakerKvState { readonly halfOpened?: string; } +type StoredCircuitBreakerKvState = CircuitBreakerKvState & { + readonly __fedifyCircuitBreakerStateVersion?: 1; +}; + /** * Details passed to {@link CircuitBreakerOptions.onActivityDrop} when a held * activity expires before the remote host recovers. @@ -81,6 +85,17 @@ export type CircuitBreakerOptions = CircuitBreakerFailurePolicy & { */ readonly releaseInterval?: Temporal.Duration | Temporal.DurationLike; + /** + * How long Fedify keeps circuit breaker state in the configured key-value + * store. + * + * When omitted, Fedify derives this from `failureWindow`, `recoveryDelay`, and + * `heldActivityTtl` for the default numeric failure policy. Custom `failure` + * callbacks do not have an inspectable time window, so custom policies do not + * expire their stored state unless this option is provided. + */ + readonly stateTtl?: Temporal.Duration | Temporal.DurationLike; + /** * Called whenever the circuit state changes. */ @@ -112,11 +127,38 @@ export interface NormalizedCircuitBreakerOptions { readonly recoveryDelay: Temporal.Duration; readonly heldActivityTtl: Temporal.Duration; readonly releaseInterval: Temporal.Duration; + readonly stateTtl: Temporal.Duration | undefined; readonly onStateChange?: CircuitBreakerOptions["onStateChange"]; readonly onActivityDrop?: CircuitBreakerOptions["onActivityDrop"]; } const MAX_CUSTOM_FAILURE_HISTORY = 100; +// Fedify 2.3.0 and 2.3.1 wrote circuit breaker state without a TTL, so those +// soft-state entries could live forever for hosts that never recovered. Keep a +// marker under the circuit prefix to clear old state on CAS-backed stores after +// upgrade, then retry once after a grace window in case old workers wrote more +// no-TTL state during a rolling deployment. See: +// https://github.com/fedify-dev/fedify/issues/916 +const LEGACY_SWEEP_MARKER = [ + "__fedify_meta", + "circuit_breaker_state_ttl_sweep_v1", +] as const; +const LEGACY_SWEEP_DELETING_MARKER = { + __fedifyDeletingCircuitBreakerLegacyState: true, +}; +const CIRCUIT_BREAKER_STATE_VERSION = 1; +const LEGACY_SWEEP_LOCK_TTL = Temporal.Duration.from({ minutes: 5 }); +const LEGACY_SWEEP_RETRY_WINDOW = Temporal.Duration.from({ hours: 24 * 7 }); +const LEGACY_SWEEP_WAIT_INTERVAL = 10; + +type LegacySweepMarker = + | { + readonly state: "sweeping"; + readonly started: string; + readonly retryUntil: string; + } + | { readonly state: "done"; readonly retryUntil: string } + | { readonly state: "final" }; /** * Constructor options for {@link CircuitBreaker}. @@ -174,6 +216,8 @@ export class CircuitBreaker { readonly #prefix: KvKey; readonly #options: NormalizedCircuitBreakerOptions; readonly #now: () => Temporal.Instant; + #legacySweep: Promise | undefined; + #legacySweepDone: Temporal.Instant | "final" | undefined; readonly #stateChangeObserver: | CircuitBreakerCreateOptions["stateChangeObserver"] | undefined; @@ -204,6 +248,7 @@ export class CircuitBreaker { remoteHost: string, message: { readonly circuitHeldSince?: string }, ): Promise { + this.#sweepLegacyStates(); const heldSince = parseHeldSince(message.circuitHeldSince); const now = this.#now(); if ( @@ -307,6 +352,7 @@ export class CircuitBreaker { async recordSuccess( remoteHost: string, ): Promise { + this.#sweepLegacyStates(); for (let attempt = 0; attempt < 10; attempt++) { const oldState = await this.#get(remoteHost); if (oldState == null) return undefined; @@ -333,6 +379,7 @@ export class CircuitBreaker { async recordFailure( remoteHost: string, ): Promise { + this.#sweepLegacyStates(); const now = this.#now(); for (let attempt = 0; attempt < 10; attempt++) { const oldState = await this.#get(remoteHost); @@ -395,13 +442,152 @@ export class CircuitBreaker { async getState( remoteHost: string, ): Promise { - return await this.#get(remoteHost); + this.#sweepLegacyStates(); + return stripStoredCircuitBreakerState(await this.#get(remoteHost)); + } + + /** + * The currently running background legacy sweep, if any. + * @internal + */ + get pendingSweep(): Promise | undefined { + return this.#legacySweep; } #key(remoteHost: string): KvKey { return [...this.#prefix, remoteHost] as KvKey; } + #legacySweepMarkerKey(): KvKey { + return [...this.#prefix, ...LEGACY_SWEEP_MARKER] as KvKey; + } + + #sweepLegacyStates(): void { + if (this.#kv.cas == null) return; + if (this.#legacySweep != null) return; + if (this.#isLegacySweepDone()) return; + this.#legacySweep = this.#sweepLegacyStatesImpl() + .catch((error) => { + getLogger(["fedify", "federation", "circuit"]).warn( + "Failed to sweep legacy circuit breaker state:\n{error}", + { error }, + ); + }) + .finally(() => { + this.#legacySweep = undefined; + }); + } + + async #sweepLegacyStatesImpl(): Promise { + const markerKey = this.#legacySweepMarkerKey(); + const marker = await this.#acquireLegacySweep(markerKey); + if (marker === "done") return; + try { + for await (const { key, value } of this.#kv.list(this.#prefix)) { + if (isEqualKvKey(key, markerKey)) continue; + if (key.length !== this.#prefix.length + 1) continue; + await this.#migrateLegacyState(key, value); + } + } catch (error) { + await this.#deleteIfUnchanged(markerKey, marker); + throw error; + } + const finishedMarker = this.#finishLegacySweepMarker(marker); + await this.#kv.set(markerKey, finishedMarker); + this.#rememberLegacySweepMarker(finishedMarker); + } + + #finishLegacySweepMarker(marker: LegacySweepMarker): LegacySweepMarker { + const retryUntil = getLegacySweepRetryUntil(marker); + if ( + retryUntil != null && + Temporal.Instant.compare( + this.#now(), + Temporal.Instant.from(retryUntil), + ) < + 0 + ) { + return { state: "done", retryUntil }; + } + return { state: "final" }; + } + + async #migrateLegacyState(key: KvKey, value: unknown): Promise { + if (isCurrentCircuitBreakerState(value)) return; + const state = parseCircuitBreakerKvState(value); + if (state != null) { + await this.#kv.cas!( + key, + value, + markCircuitBreakerState(state), + this.#setOptions(), + ); + return; + } + await this.#deleteIfUnchanged(key, value); + } + + async #deleteIfUnchanged(key: KvKey, value: unknown): Promise { + if ( + await this.#kv.cas!(key, value, LEGACY_SWEEP_DELETING_MARKER, { + ttl: LEGACY_SWEEP_LOCK_TTL, + }) + ) { + await this.#kv.delete(key); + } + } + + async #acquireLegacySweep( + markerKey: KvKey, + ): Promise { + while (true) { + const marker = await this.#kv.get(markerKey); + const now = this.#now(); + if (isLegacySweepActive(marker, now)) { + this.#rememberLegacySweepMarker(marker); + return "done"; + } + if ( + marker != null && !isLegacySweepRetryDue(marker, now) && + !isLegacySweepStale(marker, now) + ) { + return "done"; + } + const retryUntil = isLegacySweepRetryDue(marker, now) + ? marker.retryUntil + : getLegacySweepRetryUntil(marker) ?? + now.add(LEGACY_SWEEP_RETRY_WINDOW).toString(); + const sweeping = { + state: "sweeping", + started: now.toString(), + retryUntil, + } satisfies LegacySweepMarker; + if ( + await this.#kv.cas!(markerKey, marker ?? undefined, sweeping, { + ttl: LEGACY_SWEEP_LOCK_TTL, + }) + ) { + return sweeping; + } + await delay(LEGACY_SWEEP_WAIT_INTERVAL); + } + } + + #isLegacySweepDone(): boolean { + if (this.#legacySweepDone === "final") return true; + if (this.#legacySweepDone == null) return false; + if (Temporal.Instant.compare(this.#now(), this.#legacySweepDone) < 0) { + return true; + } + this.#legacySweepDone = undefined; + return false; + } + + #rememberLegacySweepMarker(marker: unknown): void { + const doneUntil = getLegacySweepDoneUntil(marker, this.#now()); + if (doneUntil != null) this.#legacySweepDone = doneUntil; + } + #capHeldRetryAt( now: Temporal.Instant, heldSince: Temporal.Instant | undefined, @@ -414,27 +600,43 @@ export class CircuitBreaker { : retryAt; } - async #get(remoteHost: string): Promise { - return parseCircuitBreakerKvState( + async #get( + remoteHost: string, + ): Promise { + return parseStoredCircuitBreakerKvState( await this.#kv.get(this.#key(remoteHost)), ); } async #replace( remoteHost: string, - oldState: CircuitBreakerKvState | undefined, + oldState: StoredCircuitBreakerKvState | undefined, newState: CircuitBreakerKvState | undefined, ): Promise { const key = this.#key(remoteHost); + const storedState = newState == null ? undefined : markCircuitBreakerState( + newState, + ); if (this.#kv.cas == null) { - if (newState == null) { + if (storedState == null) { await this.#kv.delete(key); } else { - await this.#kv.set(key, newState); + await this.#kv.set(key, storedState, this.#setOptions()); } return true; } - return await this.#kv.cas(key, oldState, newState); + return await this.#kv.cas( + key, + oldState, + storedState, + storedState == null ? undefined : this.#setOptions(), + ); + } + + #setOptions(): { ttl: Temporal.Duration } | undefined { + return this.#options.stateTtl == null + ? undefined + : { ttl: this.#options.stateTtl }; } async #notifyStateChange( @@ -485,14 +687,21 @@ export function normalizeCircuitBreakerOptions( const releaseInterval = toInstantDuration( options.releaseInterval ?? { seconds: 1 }, ); + const configuredStateTtl = options.stateTtl == null + ? undefined + : toInstantDuration(options.stateTtl); assertPositiveDuration(recoveryDelay, "recoveryDelay"); assertPositiveDuration(heldActivityTtl, "heldActivityTtl"); assertPositiveDuration(releaseInterval, "releaseInterval"); + if (configuredStateTtl != null) { + assertPositiveDuration(configuredStateTtl, "stateTtl"); + } let failure: (timestamps: readonly Temporal.Instant[]) => boolean; let pruneFailures: ( timestamps: readonly Temporal.Instant[], now: Temporal.Instant, ) => readonly Temporal.Instant[]; + let stateTtl: Temporal.Duration | undefined; if (options.failure == null) { const failureThreshold = options.failureThreshold ?? 5; if (!Number.isInteger(failureThreshold) || failureThreshold <= 0) { @@ -516,10 +725,16 @@ export function normalizeCircuitBreakerOptions( const last = timestamps[timestamps.length - 1]; return Temporal.Duration.compare(first.until(last), failureWindow) <= 0; }; + stateTtl = configuredStateTtl ?? + maxDuration( + recoveryDelay.add(failureWindow), + recoveryDelay.add(heldActivityTtl), + ); } else { failure = options.failure; pruneFailures = (timestamps) => timestamps.slice(-MAX_CUSTOM_FAILURE_HISTORY); + stateTtl = configuredStateTtl; } return { failure, @@ -527,11 +742,156 @@ export function normalizeCircuitBreakerOptions( recoveryDelay, heldActivityTtl, releaseInterval, + stateTtl, onStateChange: options.onStateChange, onActivityDrop: options.onActivityDrop, }; } +function maxDuration( + duration: Temporal.Duration, + ...durations: Temporal.Duration[] +): Temporal.Duration { + return durations.reduce( + (max, candidate) => + Temporal.Duration.compare(candidate, max) > 0 ? candidate : max, + duration, + ); +} + +function isEqualKvKey(left: KvKey, right: KvKey): boolean { + return left.length === right.length && + left.every((part, index) => part === right[index]); +} + +function isLegacySweepDone( + value: unknown, + now: Temporal.Instant, +): value is LegacySweepMarker { + return getLegacySweepDoneUntil(value, now) != null; +} + +function getLegacySweepDoneUntil( + value: unknown, + now: Temporal.Instant, +): Temporal.Instant | "final" | undefined { + if (typeof value !== "object" || value == null || !("state" in value)) { + return undefined; + } + if (value.state === "final") return "final"; + if ( + value.state === "done" && "retryUntil" in value && + typeof value.retryUntil === "string" + ) { + try { + const retryUntil = Temporal.Instant.from(value.retryUntil); + return Temporal.Instant.compare(now, retryUntil) < 0 + ? retryUntil + : undefined; + } catch { + return undefined; + } + } + return undefined; +} + +function isLegacySweepRetryDue( + value: unknown, + now: Temporal.Instant, +): value is Extract { + if ( + typeof value !== "object" || value == null || + !("state" in value) || value.state !== "done" || + !("retryUntil" in value) || typeof value.retryUntil !== "string" + ) { + return false; + } + try { + return Temporal.Instant.compare( + now, + Temporal.Instant.from(value.retryUntil), + ) >= 0; + } catch { + return false; + } +} + +function isLegacySweepActive(value: unknown, now: Temporal.Instant): boolean { + return isLegacySweepDone(value, now) || isLegacySweepInProgress(value, now); +} + +function isLegacySweepInProgress( + value: unknown, + now: Temporal.Instant, +): boolean { + if ( + typeof value !== "object" || value == null || + !("state" in value) || value.state !== "sweeping" || + !("started" in value) || typeof value.started !== "string" + ) { + return false; + } + try { + return Temporal.Instant.compare( + now, + Temporal.Instant.from(value.started).add(LEGACY_SWEEP_LOCK_TTL), + ) < 0; + } catch { + return false; + } +} + +function isLegacySweepStale( + value: unknown, + now: Temporal.Instant, +): value is Extract { + return typeof value === "object" && value != null && + "state" in value && value.state === "sweeping" && + !isLegacySweepInProgress(value, now); +} + +function getLegacySweepRetryUntil(value: unknown): string | undefined { + if ( + typeof value !== "object" || value == null || + !("retryUntil" in value) || typeof value.retryUntil !== "string" + ) { + return undefined; + } + try { + Temporal.Instant.from(value.retryUntil); + return value.retryUntil; + } catch { + return undefined; + } +} + +function isCurrentCircuitBreakerState(value: unknown): boolean { + return typeof value === "object" && value != null && + "__fedifyCircuitBreakerStateVersion" in value && + value.__fedifyCircuitBreakerStateVersion === CIRCUIT_BREAKER_STATE_VERSION; +} + +function markCircuitBreakerState( + state: CircuitBreakerKvState, +): StoredCircuitBreakerKvState { + return { + ...state, + __fedifyCircuitBreakerStateVersion: CIRCUIT_BREAKER_STATE_VERSION, + }; +} + +function stripStoredCircuitBreakerState( + state: StoredCircuitBreakerKvState | undefined, +): CircuitBreakerKvState | undefined { + if (state == null) return undefined; + const { __fedifyCircuitBreakerStateVersion: _, ...publicState } = state; + return publicState; +} + +function delay(milliseconds: number): Promise { + return new Promise((resolve) => setTimeout(resolve, milliseconds)); +} + function toInstantDuration( duration: Temporal.Duration | Temporal.DurationLike, ): Temporal.Duration { @@ -580,6 +940,14 @@ function parseHeldSince( export function parseCircuitBreakerKvState( value: unknown, ): CircuitBreakerKvState | undefined { + return stripStoredCircuitBreakerState( + parseStoredCircuitBreakerKvState(value), + ); +} + +function parseStoredCircuitBreakerKvState( + value: unknown, +): StoredCircuitBreakerKvState | undefined { const isInstantString = (v: unknown): v is string => { if (typeof v !== "string") return false; try { @@ -615,5 +983,9 @@ export function parseCircuitBreakerKvState( failures: record.failures, ...(record.opened == null ? {} : { opened: record.opened }), ...(record.halfOpened == null ? {} : { halfOpened: record.halfOpened }), + ...(record.__fedifyCircuitBreakerStateVersion === + CIRCUIT_BREAKER_STATE_VERSION + ? { __fedifyCircuitBreakerStateVersion: CIRCUIT_BREAKER_STATE_VERSION } + : {}), }; } diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index ad073b5df..87326c1de 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -73,6 +73,7 @@ import { import { recordInboxActivity } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage, Message, OutboxMessage } from "./queue.ts"; +import { markCircuitBreakerLegacySweepDone } from "./circuit-breaker-test-utils.ts"; type IsEqual = (() => T extends A ? 1 : 2) extends (() => T extends B ? 1 : 2) ? true : false; @@ -7375,6 +7376,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { const { federation, queued, kv } = setup({ failureThreshold: 1, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "success-bookkeeping.example"], { state: "closed", failures: [], @@ -7601,6 +7603,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { }, { permanentFailureStatusCodes: [500] }, ); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "permanent-bookkeeping.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -7938,6 +7941,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { }, { permanentFailureStatusCodes: [500] }, ); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "permanent-probe.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -7973,6 +7977,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { failureThreshold: 1, releaseInterval: { seconds: 1 }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "gone.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -8108,6 +8113,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { releaseInterval: { minutes: 1 }, }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "half-open-telemetry.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -8166,6 +8172,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { recoveryDelay: { seconds: 1 }, }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "stale-probe-telemetry.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], @@ -8265,6 +8272,7 @@ test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { }, }, }); + await markCircuitBreakerLegacySweepDone(kv); await kv.set(["_fedify", "circuit", "expired-probe.example"], { state: "half-open", failures: ["2026-05-25T00:00:00Z"], diff --git a/packages/postgres/src/kv.test.ts b/packages/postgres/src/kv.test.ts index aaee02954..8e80e1b97 100644 --- a/packages/postgres/src/kv.test.ts +++ b/packages/postgres/src/kv.test.ts @@ -107,6 +107,46 @@ test("PostgresKvStore.set()", { skip: dbUrl == null }, async () => { } }); +test( + "PostgresKvStore.set() refreshes TTL origin on update", + { skip: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + + const { sql, tableName, store } = getStore(); + try { + await store.initialize(); + await sql` + INSERT INTO ${sql(tableName)} (key, value, created) + VALUES ( + ${["ttl", "origin"]}, + ${"stale"}, + CURRENT_TIMESTAMP - INTERVAL '2 days' + ) + `; + + await store.set(["ttl", "origin"], "fresh", { + ttl: Temporal.Duration.from({ days: 1 }), + }); + + assert.strictEqual(await store.get(["ttl", "origin"]), "fresh"); + const result = await sql` + SELECT created + FROM ${sql(tableName)} + WHERE key = ${["ttl", "origin"]} + `; + assert.strictEqual(result.length, 1); + assert( + result[0].created > new Date(Date.now() - 60_000), + "created timestamp should be refreshed on TTL update", + ); + } finally { + await store.drop(); + await sql.end(); + } + }, +); + test("PostgresKvStore.delete()", { skip: dbUrl == null }, async () => { if (dbUrl == null) return; // Bun does not support skip option diff --git a/packages/postgres/src/kv.ts b/packages/postgres/src/kv.ts index e289bca3a..13cad9511 100644 --- a/packages/postgres/src/kv.ts +++ b/packages/postgres/src/kv.ts @@ -98,7 +98,10 @@ export class PostgresKvStore implements KvStore { ${ttl} ) ON CONFLICT (key) - DO UPDATE SET value = EXCLUDED.value, ttl = EXCLUDED.ttl; + DO UPDATE SET + value = EXCLUDED.value, + created = CURRENT_TIMESTAMP, + ttl = EXCLUDED.ttl; `; await this.#expire(); }