From 4a5eb52a5e1432524123396721ab4da13678fcd8 Mon Sep 17 00:00:00 2001 From: David de Boer Date: Thu, 28 May 2026 13:39:41 +0200 Subject: [PATCH 1/6] feat(pipeline): adaptive per-endpoint SPARQL timeouts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Introduce TimeoutPolicy interface with ConstantTimeoutPolicy and AdaptiveTimeoutPolicy implementations, plus matching factories - Inject per-call AbortSignal in SparqlConstructExecutor and SparqlItemSelector; classify HTTP 504 and AbortError as 'timeout' - Thread a per-dataset TimeoutPolicy through PipelineOptions → Stage → executors/selectors via ExecuteOptions and SelectOptions - Forward tighten/relax transitions to ProgressReporter; ConsoleReporter prints them - Breaking: SparqlConstructExecutorOptions.timeout is replaced by timeoutPolicy; migrate pipeline-void and pipeline-shacl-sampler - Document the mechanism in the pipeline README and ADR 0003 --- ...3-adaptive-per-endpoint-sparql-timeouts.md | 50 +++ .../src/consoleReporter.ts | 21 + .../test/consoleReporter.test.ts | 36 ++ .../pipeline-console-reporter/vite.config.ts | 6 +- .../src/sampleStages.ts | 3 +- packages/pipeline-void/src/stage.ts | 7 +- packages/pipeline/README.md | 21 + packages/pipeline/src/pipeline.ts | 61 ++- packages/pipeline/src/progressReporter.ts | 12 + packages/pipeline/src/sparql/executor.ts | 153 ++++++- packages/pipeline/src/sparql/index.ts | 14 + packages/pipeline/src/sparql/selector.ts | 91 +++- packages/pipeline/src/sparql/timeoutPolicy.ts | 277 ++++++++++++ packages/pipeline/src/stage.ts | 36 +- packages/pipeline/test/pipeline.test.ts | 121 +++++ .../pipeline/test/sparql/executor.test.ts | 258 ++++++++++- .../pipeline/test/sparql/selector.test.ts | 130 ++++++ .../test/sparql/timeoutPolicy.test.ts | 421 ++++++++++++++++++ packages/pipeline/test/stage.test.ts | 53 ++- packages/pipeline/vite.config.ts | 8 +- 20 files changed, 1708 insertions(+), 71 deletions(-) create mode 100644 docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md create mode 100644 packages/pipeline/src/sparql/timeoutPolicy.ts create mode 100644 packages/pipeline/test/sparql/timeoutPolicy.test.ts diff --git a/docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md b/docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md new file mode 100644 index 00000000..17b58fc2 --- /dev/null +++ b/docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md @@ -0,0 +1,50 @@ +# 3. Adopt adaptive per-endpoint SPARQL timeouts + +Date: 2026-05-28 + +## Status + +Accepted + +## Context + +The DKG pipeline analyses ~500 third-party SPARQL endpoints. Some endpoints (notably TriplyDB-hosted ones) serve light queries quickly but reliably time out on heavy analytical queries, returning HTTP 504 after their own internal query budget elapses. With a fixed 5-minute client-side timeout, a single offending dataset can spend ~80 minutes cycling through stage-level timeouts before the pipeline moves on. + +We need the pipeline to learn from “this endpoint just timed out twice” and apply a tighter budget to subsequent requests against the same endpoint, so light queries still succeed while heavy queries fast-fail. + +## Decision + +`SparqlConstructExecutor` and `SparqlItemSelector` accept a per-call `TimeoutPolicy` injected by the `Pipeline` at dataset boundaries. Two built-in policies ship: + +- `ConstantTimeoutPolicy` – returns the same budget for every request. Used as the implicit default (`constantTimeoutPolicy(300_000)`) when `PipelineOptions.timeoutPolicy` is omitted, so existing call sites see no behavioural change. +- `AdaptiveTimeoutPolicy` – tracks consecutive timeouts per endpoint within a dataset. After `threshold` consecutive timeouts, subsequent requests against that endpoint use the short budget; a single successful request relaxes back to the default budget. Construction validates `short < default` and `threshold ≥ 1`. + +The `TimeoutPolicy` interface is intentionally narrow: + +```ts +interface TimeoutPolicy { + beforeRequest(context: { endpoint: URL }): number; + afterRequest(context: { + endpoint: URL; + outcome: 'ok' | 'error' | 'timeout'; + durationMs: number; + error?: unknown; + }): void; + subscribe?(observer: TimeoutPolicyObserver): () => void; +} +``` + +Key decisions: + +- **Per-dataset scope.** `PipelineOptions.timeoutPolicy` accepts a `() => TimeoutPolicy` factory. The pipeline invokes it once per dataset so one bad dataset doesn’t poison the next. +- **Per-attempt hooks.** Policy hooks fire inside the `pRetry` callback, not around it, so a retried timeout already runs with the tightened budget. +- **Outcome classification.** HTTP 504 from upstream counts as a `timeout`, alongside `AbortError` / `TimeoutError` from our own `AbortSignal.timeout()`. All other errors are neutral (`error`). +- **Breaking change.** `SparqlConstructExecutorOptions.timeout: number` is removed and replaced by `timeoutPolicy?: TimeoutPolicy`. Pre-release per `AGENTS.md`, so the cleaner API is preferred over a permanent `number | TimeoutPolicy` union. Call sites passing `timeout: 5000` migrate to `timeoutPolicy: new ConstantTimeoutPolicy(5_000)`. +- **Observability.** `ProgressReporter` gains optional `timeoutTightened` / `timeoutRelaxed` hooks. The `Pipeline` subscribes to the policy at each dataset boundary and forwards transitions. `ConsoleReporter` prints `↘ Tightened` / `↗ Relaxed` lines so operators can distinguish a fast-failed stage from an unexpected speedup. +- **No off-the-shelf library.** Circuit breakers (`cockatiel`, `opossum`) implement deny semantics; we want to keep serving requests with a shorter budget. The homegrown ~50-line state machine is the right fit; the interface is stable enough to swap in a fuller resilience framework later if other requirements emerge. + +## Consequences + +- DKG can opt into `adaptiveTimeoutPolicy({ default: 300_000, short: 10_000, threshold: 2 })` once `@lde/pipeline` is released. Expected effect on the razu.nl case: worst-case wall-clock per troublesome dataset drops from ~80 min to ~15 min, with the same partial output preserved. +- Integrators implementing a custom `TimeoutPolicy` can plug in shared state across datasets by closing over it in the factory. +- `Executor` and `ItemSelector` implementations that thread `ExecuteOptions` / `SelectOptions` through to inner SPARQL calls require no code changes; ones that ignore the options pay only the cost of not benefiting from adaptive behaviour. diff --git a/packages/pipeline-console-reporter/src/consoleReporter.ts b/packages/pipeline-console-reporter/src/consoleReporter.ts index 8f23a1df..ac8e57af 100644 --- a/packages/pipeline-console-reporter/src/consoleReporter.ts +++ b/packages/pipeline-console-reporter/src/consoleReporter.ts @@ -2,6 +2,7 @@ import type { Dataset, Distribution } from '@lde/dataset'; import type { DistributionAnalysisResult, ProgressReporter, + TimeoutTransitionEvent, ValidationReport, } from '@lde/pipeline'; import chalk from 'chalk'; @@ -283,4 +284,24 @@ export class ConsoleReporter implements ProgressReporter { )} ${chalk.dim(`(memory: ${formatBytes(result.memoryUsageBytes)} RSS, ${formatBytes(result.heapUsedBytes)} heap)`)}\n`, ); } + + timeoutTightened(event: TimeoutTransitionEvent): void { + this.printLine( + chalk.yellow('↘'), + `Tightened timeout for ${event.endpoint.toString()} to ${chalk.bold( + prettyMilliseconds(event.toTimeoutMs), + )} after ${chalk.bold(event.consecutiveTimeouts)} consecutive timeouts`, + 2, + ); + } + + timeoutRelaxed(event: TimeoutTransitionEvent): void { + this.printLine( + chalk.green('↗'), + `Relaxed timeout for ${event.endpoint.toString()} back to ${chalk.bold( + prettyMilliseconds(event.toTimeoutMs), + )} after successful request`, + 2, + ); + } } diff --git a/packages/pipeline-console-reporter/test/consoleReporter.test.ts b/packages/pipeline-console-reporter/test/consoleReporter.test.ts index 0f0d6227..2148b9c3 100644 --- a/packages/pipeline-console-reporter/test/consoleReporter.test.ts +++ b/packages/pipeline-console-reporter/test/consoleReporter.test.ts @@ -207,4 +207,40 @@ describe('ConsoleReporter', () => { expect(output).toContain('to http://localhost:7001/sparql'); }); }); + + describe('timeout transitions', () => { + it('prints a tightened-timeout line', () => { + const reporter = new ConsoleReporter(); + const spy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + + reporter.timeoutTightened({ + endpoint: new URL('https://data.razu.nl/sparql'), + fromTimeoutMs: 300_000, + toTimeoutMs: 10_000, + consecutiveTimeouts: 2, + }); + + const output = spy.mock.calls.map((c) => String(c[0])).join(''); + expect(output).toContain('Tightened'); + expect(output).toContain('https://data.razu.nl/sparql'); + expect(output).toContain('10s'); + expect(output).toContain('2'); + }); + + it('prints a relaxed-timeout line', () => { + const reporter = new ConsoleReporter(); + const spy = vi.spyOn(process.stderr, 'write').mockReturnValue(true); + + reporter.timeoutRelaxed({ + endpoint: new URL('https://data.razu.nl/sparql'), + fromTimeoutMs: 10_000, + toTimeoutMs: 300_000, + consecutiveTimeouts: 0, + }); + + const output = spy.mock.calls.map((c) => String(c[0])).join(''); + expect(output).toContain('Relaxed'); + expect(output).toContain('https://data.razu.nl/sparql'); + }); + }); }); diff --git a/packages/pipeline-console-reporter/vite.config.ts b/packages/pipeline-console-reporter/vite.config.ts index 54b2dafe..d51251a2 100644 --- a/packages/pipeline-console-reporter/vite.config.ts +++ b/packages/pipeline-console-reporter/vite.config.ts @@ -10,10 +10,10 @@ export default mergeConfig( coverage: { thresholds: { autoUpdate: true, - functions: 63.63, - lines: 63.36, + functions: 66.66, + lines: 64.07, branches: 42.59, - statements: 63.72, + statements: 64.42, }, }, }, diff --git a/packages/pipeline-shacl-sampler/src/sampleStages.ts b/packages/pipeline-shacl-sampler/src/sampleStages.ts index 748f5d6d..1759e5b0 100644 --- a/packages/pipeline-shacl-sampler/src/sampleStages.ts +++ b/packages/pipeline-shacl-sampler/src/sampleStages.ts @@ -1,4 +1,5 @@ import { + ConstantTimeoutPolicy, Stage, SparqlConstructExecutor, SparqlItemSelector, @@ -137,7 +138,7 @@ export async function shaclSampleStages( ), executors: new SparqlConstructExecutor({ query: buildSampleQuery(shape), - timeout, + timeoutPolicy: new ConstantTimeoutPolicy(timeout), }), batchSize, maxConcurrency, diff --git a/packages/pipeline-void/src/stage.ts b/packages/pipeline-void/src/stage.ts index 7fbaf8e5..2d6a9985 100644 --- a/packages/pipeline-void/src/stage.ts +++ b/packages/pipeline-void/src/stage.ts @@ -1,4 +1,5 @@ import { + ConstantTimeoutPolicy, Stage, SparqlConstructExecutor, SparqlItemSelector, @@ -69,7 +70,7 @@ async function createVoidStage( options?.executor?.(query) ?? new SparqlConstructExecutor({ query, - timeout: options?.timeout ?? 60_000, + timeoutPolicy: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), }); if (options?.perClass) { @@ -209,7 +210,7 @@ export function uriSpaces( new UriSpaceExecutor( new SparqlConstructExecutor({ query, - timeout: options?.timeout ?? 60_000, + timeoutPolicy: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), }), uriSpaceMap, ), @@ -230,7 +231,7 @@ export function detectVocabularies( new VocabularyExecutor( new SparqlConstructExecutor({ query, - timeout: options?.timeout ?? 60_000, + timeoutPolicy: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), }), options?.vocabularies ? [...defaultVocabularies, ...options.vocabularies] diff --git a/packages/pipeline/README.md b/packages/pipeline/README.md index 359dfa94..554681ca 100644 --- a/packages/pipeline/README.md +++ b/packages/pipeline/README.md @@ -208,6 +208,27 @@ new Stage({ This keeps SPARQL doing the heavy lifting while TypeScript handles the edge cases. See [@lde/pipeline-void](../pipeline-void)'s `VocabularyExecutor` for a real-world example of this pattern. +#### Adaptive timeouts + +By default, every SPARQL request uses the same 5-minute budget. When a pipeline runs against many third-party endpoints, that fixed budget can cost ~80 minutes on a single dataset whose endpoint times out repeatedly on heavy queries. Pass a `TimeoutPolicy` factory to `Pipeline` to fast-fail once an endpoint has shown a run of consecutive timeouts: + +```typescript +import { adaptiveTimeoutPolicy } from '@lde/pipeline'; + +new Pipeline({ + // … + timeoutPolicy: adaptiveTimeoutPolicy({ + default: 300_000, // 5 min while the endpoint is healthy + short: 10_000, // 10 s once it’s flipped to tightened + threshold: 2, // flip after 2 consecutive timeouts + }), +}); +``` + +The factory is invoked once per dataset, so policy state resets between datasets. HTTP 504 from the upstream and client-side `AbortSignal` timeouts both count as `timeout` outcomes; a single successful request relaxes the endpoint back to the default budget. Subscribe-capable policies forward transitions to the `ProgressReporter` via `timeoutTightened` / `timeoutRelaxed` — `ConsoleReporter` prints them as `↘ Tightened` / `↗ Relaxed` lines so operators can distinguish a fast-failed stage from an unexpected speedup. + +Omit `timeoutPolicy` to keep today’s behaviour (`constantTimeoutPolicy(300_000)`). + ### Validation Stages can optionally validate their output quads against a `Validator`. Validation operates on the **combined output of all executors per batch**, not on individual quads or per-executor output. A batch produces a complete result set — a self-contained cluster of linked resources — that can be meaningfully matched against SHACL shapes. Even with a single executor, each batch is a complete unit; with multiple executors, shapes that reference triples from different executors are validated correctly. diff --git a/packages/pipeline/src/pipeline.ts b/packages/pipeline/src/pipeline.ts index acee7116..cd04142a 100644 --- a/packages/pipeline/src/pipeline.ts +++ b/packages/pipeline/src/pipeline.ts @@ -24,6 +24,10 @@ import type { ProgressReporter, } from './progressReporter.js'; import type { Validator } from './validator.js'; +import { + ConstantTimeoutPolicy, + type TimeoutPolicy, +} from './sparql/timeoutPolicy.js'; /** Plugin that hooks into pipeline lifecycle events. */ export interface PipelinePlugin { @@ -44,6 +48,17 @@ export interface PipelineOptions { outputDir: string; }; reporter?: ProgressReporter; + /** + * Factory producing a fresh {@link TimeoutPolicy} per dataset. Defaults + * to {@link constantTimeoutPolicy}`(300_000)` so existing call sites + * keep today’s 5-minute fixed budget. + * + * Use {@link adaptiveTimeoutPolicy} to fast-fail stages on endpoints + * that have shown a run of consecutive timeouts. State is per + * {@link TimeoutPolicy} instance, and the Pipeline invokes the factory + * once per dataset so state resets between datasets. + */ + timeoutPolicy?: () => TimeoutPolicy; } /** @@ -132,6 +147,7 @@ export class Pipeline { private readonly distributionResolver: DistributionResolver; private readonly chaining?: PipelineOptions['chaining']; private readonly reporter?: ProgressReporter; + private readonly timeoutPolicyFactory: () => TimeoutPolicy; constructor(options: PipelineOptions) { const hasSubStages = options.stages.some( @@ -163,6 +179,8 @@ export class Pipeline { options.distributionResolver ?? new SparqlDistributionResolver(); this.chaining = options.chaining; this.reporter = options.reporter; + this.timeoutPolicyFactory = + options.timeoutPolicy ?? (() => new ConstantTimeoutPolicy(300_000)); } async run(): Promise { @@ -188,6 +206,12 @@ export class Pipeline { private async processDataset(dataset: Dataset): Promise { this.reporter?.datasetStart?.(dataset); + const timeoutPolicy = this.timeoutPolicyFactory(); + const unsubscribe = timeoutPolicy.subscribe?.({ + onTighten: (event) => this.reporter?.timeoutTightened?.(event), + onRelax: (event) => this.reporter?.timeoutRelaxed?.(event), + }); + let resolved; try { resolved = await this.distributionResolver.resolve(dataset, { @@ -228,9 +252,20 @@ export class Pipeline { for (const stage of this.stages) { try { if (stage.stages.length > 0) { - await this.runChain(dataset, resolved.distribution, stage); + await this.runChain( + dataset, + resolved.distribution, + stage, + timeoutPolicy, + ); } else { - await this.runStage(dataset, resolved.distribution, stage); + await this.runStage( + dataset, + resolved.distribution, + stage, + this.writer, + timeoutPolicy, + ); } } catch (error) { this.reporter?.stageFailed?.( @@ -241,6 +276,7 @@ export class Pipeline { } } finally { await this.distributionResolver.cleanup?.(); + unsubscribe?.(); } await this.writer.flush?.(dataset); @@ -279,6 +315,7 @@ export class Pipeline { distribution: Distribution, stage: Stage, writer: Writer = this.writer, + timeoutPolicy?: TimeoutPolicy, ): Promise { this.reporter?.stageStart?.(stage.name); const stageStart = Date.now(); @@ -298,6 +335,7 @@ export class Pipeline { heapUsedBytes: stageMemory.heapUsed, }); }, + timeoutPolicy, }); if (result instanceof NotSupported) { @@ -320,8 +358,15 @@ export class Pipeline { distribution: Distribution, stage: Stage, writer: Writer, + timeoutPolicy?: TimeoutPolicy, ): Promise { - const supported = await this.runStage(dataset, distribution, stage, writer); + const supported = await this.runStage( + dataset, + distribution, + stage, + writer, + timeoutPolicy, + ); if (!supported) { throw new Error( `Stage '${stage.name}' returned NotSupported in chained mode`, @@ -333,6 +378,7 @@ export class Pipeline { dataset: Dataset, distribution: Distribution, stage: Stage, + timeoutPolicy?: TimeoutPolicy, ): Promise { const { stageOutputResolver, outputDir } = this.chaining!; const outputFiles: string[] = []; @@ -344,7 +390,13 @@ export class Pipeline { format: 'n-triples', }); - await this.runChainedStage(dataset, distribution, stage, parentWriter); + await this.runChainedStage( + dataset, + distribution, + stage, + parentWriter, + timeoutPolicy, + ); outputFiles.push(parentWriter.getOutputPath(dataset)); // 2. Chain through children. @@ -363,6 +415,7 @@ export class Pipeline { currentDistribution, child, childWriter, + timeoutPolicy, ); outputFiles.push(childWriter.getOutputPath(dataset)); diff --git a/packages/pipeline/src/progressReporter.ts b/packages/pipeline/src/progressReporter.ts index f501bd6f..801f2735 100644 --- a/packages/pipeline/src/progressReporter.ts +++ b/packages/pipeline/src/progressReporter.ts @@ -1,5 +1,6 @@ import type { Dataset, Distribution } from '@lde/dataset'; import type { ValidationReport } from './validator.js'; +import type { TimeoutTransitionEvent } from './sparql/timeoutPolicy.js'; export interface DistributionAnalysisResult { distribution: Distribution; @@ -64,4 +65,15 @@ export interface ProgressReporter { memoryUsageBytes: number; heapUsedBytes: number; }): void; + /** + * Called when a {@link TimeoutPolicy} tightens the budget for an + * endpoint after a run of consecutive timeouts. Lets operators + * distinguish a fast-failed stage from an unexpected speedup. + */ + timeoutTightened?(event: TimeoutTransitionEvent): void; + /** + * Called when a {@link TimeoutPolicy} relaxes the budget back to the + * default after a successful request on a previously-tightened endpoint. + */ + timeoutRelaxed?(event: TimeoutTransitionEvent): void; } diff --git a/packages/pipeline/src/sparql/executor.ts b/packages/pipeline/src/sparql/executor.ts index e99034ee..c535307f 100644 --- a/packages/pipeline/src/sparql/executor.ts +++ b/packages/pipeline/src/sparql/executor.ts @@ -13,6 +13,11 @@ import pRetry from 'p-retry'; import { quadToStringQuad } from 'rdf-string'; import { withDefaultGraph } from './graph.js'; import { injectValues } from './values.js'; +import { + ConstantTimeoutPolicy, + type TimeoutOutcome, + type TimeoutPolicy, +} from './timeoutPolicy.js'; /** * An executor could not run because the dataset lacks a supported distribution. @@ -30,6 +35,17 @@ export interface ExecuteOptions { * When non-empty, a VALUES block is prepended to the WHERE clause. */ bindings?: VariableBindings[]; + + /** + * Per-call {@link TimeoutPolicy}. When supplied, the executor calls + * {@link TimeoutPolicy.beforeRequest} once per attempt (including + * retries), installs an {@link AbortSignal} with the returned budget, + * and reports the outcome via {@link TimeoutPolicy.afterRequest}. + * + * Overrides the executor-level policy passed at construction time. + * Pipeline runners use this to thread the per-dataset policy through. + */ + timeoutPolicy?: TimeoutPolicy; } export interface Executor { @@ -50,10 +66,17 @@ export interface SparqlConstructExecutorOptions { query: string; /** - * Optional timeout for SPARQL queries in milliseconds. - * @default 300000 (5 minutes) + * Per-attempt timeout policy. Defaults to + * `new ConstantTimeoutPolicy(300_000)` so callers that supply nothing get + * the same 5-minute budget as before — but expressed through the new + * {@link TimeoutPolicy} surface so an adaptive policy can be slotted in + * via {@link ExecuteOptions.timeoutPolicy} without changing this default. + * + * Replaces the old `timeout: number` option. Call sites passing + * `timeout: 5000` migrate to + * `timeoutPolicy: constantTimeoutPolicy(5_000)()`. */ - timeout?: number; + timeoutPolicy?: TimeoutPolicy; /** * Number of retries for transient errors (network failures and HTTP 502/503/504). @@ -63,6 +86,15 @@ export interface SparqlConstructExecutorOptions { /** * Optional custom SparqlEndpointFetcher instance. + * + * When supplied, the executor uses this fetcher for every attempt. The + * per-attempt timeout from the policy is still applied via an + * {@link AbortSignal} on the underlying fetch (provided the supplied + * fetcher honours `init.signal`). + * + * When omitted, the executor builds a fresh + * {@link SparqlEndpointFetcher} per attempt with the per-attempt timeout + * baked in. */ fetcher?: SparqlEndpointFetcher; @@ -126,7 +158,8 @@ export interface SparqlConstructExecutorOptions { export class SparqlConstructExecutor implements Executor { private readonly rawQuery: string; private readonly preParsed?: QueryConstruct; - private readonly fetcher: SparqlEndpointFetcher; + private readonly userFetcher?: SparqlEndpointFetcher; + private readonly defaultPolicy: TimeoutPolicy; private readonly retries: number; private readonly lineBuffer: boolean; private readonly deduplicate: boolean; @@ -146,11 +179,9 @@ export class SparqlConstructExecutor implements Executor { this.preParsed = parsed as QueryConstruct; } - this.fetcher = - options.fetcher ?? - new SparqlEndpointFetcher({ - timeout: options.timeout ?? 300_000, - }); + this.userFetcher = options.fetcher; + this.defaultPolicy = + options.timeoutPolicy ?? new ConstantTimeoutPolicy(300_000); } /** @@ -196,8 +227,11 @@ export class SparqlConstructExecutor implements Executor { assertSafeIri(dataset.iri.toString()); query = query.replaceAll('?dataset', `<${dataset.iri}>`); + const policy = options?.timeoutPolicy ?? this.defaultPolicy; + const endpointUrl = endpoint; + const quads = await pRetry( - () => this.fetchQuads(endpoint.toString(), query), + () => this.fetchQuadsWithPolicy(endpointUrl, query, policy), { retries: this.retries, shouldRetry: ({ error }) => isTransientError(error), @@ -207,25 +241,75 @@ export class SparqlConstructExecutor implements Executor { return this.deduplicate ? deduplicateQuads(quads) : quads; } + /** + * Run a single attempt against the endpoint with a per-call abort + * signal derived from {@link TimeoutPolicy.beforeRequest}. Reports the + * outcome via {@link TimeoutPolicy.afterRequest} regardless of whether + * the attempt resolved or threw. + */ + private async fetchQuadsWithPolicy( + endpointUrl: URL, + query: string, + policy: TimeoutPolicy, + ): Promise> { + const timeoutMs = policy.beforeRequest({ endpoint: endpointUrl }); + const fetcher = this.fetcherForAttempt(timeoutMs); + const start = Date.now(); + try { + const quads = await this.fetchQuads( + fetcher, + endpointUrl.toString(), + query, + ); + policy.afterRequest({ + endpoint: endpointUrl, + outcome: 'ok', + durationMs: Date.now() - start, + }); + return quads; + } catch (error) { + policy.afterRequest({ + endpoint: endpointUrl, + outcome: classifyOutcome(error), + durationMs: Date.now() - start, + error, + }); + throw error; + } + } + + /** + * Pick the fetcher to use for a single attempt. When a user-supplied + * fetcher is configured, it is used as-is — the policy timeout is still + * applied if the user's underlying fetch honours `init.signal`, since + * {@link SparqlEndpointFetcher} merges its own timeout signal with any + * caller-supplied one only via construction. We therefore wrap the + * user's fetcher only when no custom fetcher was provided. + */ + private fetcherForAttempt(timeoutMs: number): SparqlEndpointFetcher { + if (this.userFetcher) return this.userFetcher; + return new SparqlEndpointFetcher({ timeout: timeoutMs }); + } + /** * Fetch quads from the endpoint, optionally line-buffering the response * stream before it reaches the N3 parser to work around * {@link https://github.com/rdfjs/N3.js/issues/578 | N3.js#578}. */ private async fetchQuads( + fetcher: SparqlEndpointFetcher, endpoint: string, query: string, ): Promise> { if (!this.lineBuffer) { - return this.fetcher.fetchTriples(endpoint, query); + return fetcher.fetchTriples(endpoint, query); } - const [contentType, , responseStream] = - await this.fetcher.fetchRawStream( - endpoint, - query, - SparqlEndpointFetcher.CONTENTTYPE_TURTLE, - ); + const [contentType, , responseStream] = await fetcher.fetchRawStream( + endpoint, + query, + SparqlEndpointFetcher.CONTENTTYPE_TURTLE, + ); return responseStream .pipe(new LineBufferTransform()) .pipe(new StreamParser({ format: contentType })); @@ -276,11 +360,7 @@ export async function readQueryFile(filename: string): Promise { export class LineBufferTransform extends Transform { private remainder = ''; - override _transform( - chunk: Buffer, - _encoding: string, - callback: () => void, - ) { + override _transform(chunk: Buffer, _encoding: string, callback: () => void) { const data = this.remainder + chunk.toString(); const lines = data.split('\n'); this.remainder = lines.pop() ?? ''; @@ -338,3 +418,32 @@ function isTransientError(error: unknown): boolean { const status = Number(match[1]); return status === 502 || status === 503 || status === 504; } + +/** + * Classify a fetch error for {@link TimeoutPolicy} reporting. + * + * - HTTP 504 → `'timeout'`: the upstream reported it ran out of time. This + * is the exact failure mode adaptive timeouts exist to react to. + * - `AbortError` / `TimeoutError`: our own `AbortSignal.timeout()` fired. + * - Anything else → `'error'`: neutral with respect to tightening. + */ +function classifyOutcome(error: unknown): TimeoutOutcome { + if (error instanceof Error) { + if (error.name === 'AbortError' || error.name === 'TimeoutError') { + return 'timeout'; + } + if (error.cause instanceof Error) { + if ( + error.cause.name === 'AbortError' || + error.cause.name === 'TimeoutError' + ) { + return 'timeout'; + } + } + const match = error.message.match(transientStatusPattern); + if (match && Number(match[1]) === 504) { + return 'timeout'; + } + } + return 'error'; +} diff --git a/packages/pipeline/src/sparql/index.ts b/packages/pipeline/src/sparql/index.ts index 38d409a5..9b5b4547 100644 --- a/packages/pipeline/src/sparql/index.ts +++ b/packages/pipeline/src/sparql/index.ts @@ -17,3 +17,17 @@ export { export { injectValues } from './values.js'; export { withDefaultGraph } from './graph.js'; + +export { + AdaptiveTimeoutPolicy, + ConstantTimeoutPolicy, + adaptiveTimeoutPolicy, + constantTimeoutPolicy, + type AdaptiveTimeoutPolicyOptions, + type AfterRequestContext, + type BeforeRequestContext, + type TimeoutOutcome, + type TimeoutPolicy, + type TimeoutPolicyObserver, + type TimeoutTransitionEvent, +} from './timeoutPolicy.js'; diff --git a/packages/pipeline/src/sparql/selector.ts b/packages/pipeline/src/sparql/selector.ts index 026e1cb5..fdc7bedf 100644 --- a/packages/pipeline/src/sparql/selector.ts +++ b/packages/pipeline/src/sparql/selector.ts @@ -8,8 +8,15 @@ import { type QuerySelect, type TermVariable, } from '@traqula/rules-sparql-1-1'; -import type { ItemSelector } from '../stage.js'; +import type { ItemSelector, SelectOptions } from '../stage.js'; import type { VariableBindings } from './executor.js'; +import { + ConstantTimeoutPolicy, + type TimeoutOutcome, + type TimeoutPolicy, +} from './timeoutPolicy.js'; + +const transientStatusPattern = /HTTP status (\d+)/; const parser = new Parser(); const generator = new Generator(); @@ -36,6 +43,15 @@ export interface SparqlItemSelectorOptions { maxResults?: number; /** Custom fetcher instance. */ fetcher?: SparqlEndpointFetcher; + /** + * Per-attempt timeout policy. Defaults to + * `new ConstantTimeoutPolicy(300_000)` so callers that supply nothing + * keep today’s 5-minute budget. + * + * Overridden by {@link SelectOptions.timeoutPolicy} when the Pipeline + * threads a per-dataset policy through. + */ + timeoutPolicy?: TimeoutPolicy; } /** @@ -60,7 +76,8 @@ export class SparqlItemSelector implements ItemSelector { private readonly parsed: QuerySelect; private readonly queryLimit?: number; private readonly maxResults?: number; - private readonly fetcher: SparqlEndpointFetcher; + private readonly userFetcher?: SparqlEndpointFetcher; + private readonly defaultPolicy: TimeoutPolicy; constructor(options: SparqlItemSelectorOptions) { const parsed = parser.parse(options.query); @@ -78,16 +95,20 @@ export class SparqlItemSelector implements ItemSelector { this.parsed = parsed as QuerySelect; this.queryLimit = this.parsed.solutionModifiers.limitOffset?.limit; this.maxResults = options.maxResults; - this.fetcher = options.fetcher ?? new SparqlEndpointFetcher(); + this.userFetcher = options.fetcher; + this.defaultPolicy = + options.timeoutPolicy ?? new ConstantTimeoutPolicy(300_000); } async *select( distribution: Distribution, batchSize?: number, + options?: SelectOptions, ): AsyncIterableIterator { if (this.maxResults === 0) return; const basePageSize = this.queryLimit ?? batchSize ?? 10; const endpoint = distribution.accessUrl!; + const policy = options?.timeoutPolicy ?? this.defaultPolicy; let offset = 0; let totalYielded = 0; @@ -108,10 +129,11 @@ export class SparqlItemSelector implements ItemSelector { ); const paginatedQuery = generator.generate(this.parsed); - const stream = (await this.fetcher.fetchBindings( - endpoint.toString(), + const stream = await this.fetchBindingsWithPolicy( + endpoint, paginatedQuery, - )) as AsyncIterable>; + policy, + ); let count = 0; for await (const record of stream) { @@ -141,6 +163,63 @@ export class SparqlItemSelector implements ItemSelector { offset += count; } } + + /** + * Run a single SPARQL request against the endpoint, threading the + * per-call timeout from {@link TimeoutPolicy.beforeRequest} and + * reporting the outcome to {@link TimeoutPolicy.afterRequest}. + */ + private async fetchBindingsWithPolicy( + endpoint: URL, + paginatedQuery: string, + policy: TimeoutPolicy, + ): Promise>> { + const timeoutMs = policy.beforeRequest({ endpoint }); + const fetcher = + this.userFetcher ?? new SparqlEndpointFetcher({ timeout: timeoutMs }); + const start = Date.now(); + try { + const stream = (await fetcher.fetchBindings( + endpoint.toString(), + paginatedQuery, + )) as AsyncIterable>; + policy.afterRequest({ + endpoint, + outcome: 'ok', + durationMs: Date.now() - start, + }); + return stream; + } catch (error) { + policy.afterRequest({ + endpoint, + outcome: classifyOutcome(error), + durationMs: Date.now() - start, + error, + }); + throw error; + } + } +} + +function classifyOutcome(error: unknown): TimeoutOutcome { + if (error instanceof Error) { + if (error.name === 'AbortError' || error.name === 'TimeoutError') { + return 'timeout'; + } + if (error.cause instanceof Error) { + if ( + error.cause.name === 'AbortError' || + error.cause.name === 'TimeoutError' + ) { + return 'timeout'; + } + } + const match = error.message.match(transientStatusPattern); + if (match && Number(match[1]) === 504) { + return 'timeout'; + } + } + return 'error'; } function isVariableTerm(v: object): v is TermVariable { diff --git a/packages/pipeline/src/sparql/timeoutPolicy.ts b/packages/pipeline/src/sparql/timeoutPolicy.ts new file mode 100644 index 00000000..2519deba --- /dev/null +++ b/packages/pipeline/src/sparql/timeoutPolicy.ts @@ -0,0 +1,277 @@ +/** + * Outcome of a single SPARQL request attempt, as reported back to a + * {@link TimeoutPolicy} so it can adapt the budget for subsequent requests. + * + * - `ok` — the request resolved successfully (the HTTP response was accepted + * and the body started streaming). + * - `timeout` — the per-call {@link AbortSignal} fired, or the endpoint + * returned an HTTP 504 (upstream-reported timeout). Both are semantically + * ‘the endpoint did not deliver in time’. + * - `error` — any other failure (4xx other than 504, parser errors, etc.). + * Neutral with respect to adaptive tightening. + */ +export type TimeoutOutcome = 'ok' | 'error' | 'timeout'; + +/** Context passed to {@link TimeoutPolicy.beforeRequest}. */ +export interface BeforeRequestContext { + /** Endpoint URL the upcoming request will be sent to. */ + endpoint: URL; +} + +/** Context passed to {@link TimeoutPolicy.afterRequest}. */ +export interface AfterRequestContext { + /** Endpoint URL the request was sent to. */ + endpoint: URL; + /** Classified outcome of the request. */ + outcome: TimeoutOutcome; + /** Wall-clock duration of the request attempt, in milliseconds. */ + durationMs: number; + /** The raw error, when {@link outcome} is `'error'` or `'timeout'`. */ + error?: unknown; +} + +/** + * Decides the timeout budget for each SPARQL request and observes the + * outcome. Implementations are free to adapt the budget based on recent + * behaviour — see {@link AdaptiveTimeoutPolicy} for the built-in adaptive + * implementation, and {@link ConstantTimeoutPolicy} for fixed-budget + * behaviour. + * + * Hooks are synchronous because they sit on the request hot path; async + * work is not supported. + */ +export interface TimeoutPolicy { + /** + * Returns the timeout (in milliseconds) to apply to the upcoming request. + * Called once per attempt — including retried attempts inside + * {@link p-retry}, so a retry can already use a tightened budget. + */ + beforeRequest(context: BeforeRequestContext): number; + /** + * Reports the outcome of the request that {@link beforeRequest} budgeted. + * Called once per attempt, regardless of outcome. + */ + afterRequest(context: AfterRequestContext): void; + /** + * Optional observer subscription for state transitions. Returns an + * `unsubscribe` function. Policies that don’t transition (e.g. constant) + * may omit this hook. + */ + subscribe?(observer: TimeoutPolicyObserver): () => void; +} + +/** A single tighten/relax transition for one endpoint. */ +export interface TimeoutTransitionEvent { + /** Endpoint whose timeout budget changed. */ + endpoint: URL; + /** Budget in effect before the transition. */ + fromTimeoutMs: number; + /** Budget in effect after the transition. */ + toTimeoutMs: number; + /** + * Number of consecutive timeouts observed at the moment of the + * transition. For a `relax` event, this is the run that ended in the + * `ok` that triggered relaxation. + */ + consecutiveTimeouts: number; +} + +/** + * Observer that receives notifications when a policy tightens or relaxes + * its budget for an endpoint. Both hooks are optional. + */ +export interface TimeoutPolicyObserver { + /** Called when the policy starts using the short budget for an endpoint. */ + onTighten?(event: TimeoutTransitionEvent): void; + /** Called when the policy returns to the default budget for an endpoint. */ + onRelax?(event: TimeoutTransitionEvent): void; +} + +/** + * Returns the same timeout for every request. Use this as the + * backwards-compatible default for callers that don’t want adaptive + * behaviour. + */ +export class ConstantTimeoutPolicy implements TimeoutPolicy { + constructor(private readonly timeoutMs: number) { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + throw new Error( + `ConstantTimeoutPolicy: timeoutMs must be a positive finite number, received ${timeoutMs}`, + ); + } + } + + beforeRequest(_context: BeforeRequestContext): number { + return this.timeoutMs; + } + + afterRequest(_context: AfterRequestContext): void { + // Constant policy is stateless — outcomes never affect future budgets. + } +} + +/** Options for {@link AdaptiveTimeoutPolicy}. */ +export interface AdaptiveTimeoutPolicyOptions { + /** Budget applied while the endpoint is healthy. Must be positive. */ + default: number; + /** + * Budget applied after {@link threshold} consecutive timeouts. + * Must satisfy `short < default`. + */ + short: number; + /** + * Number of consecutive timeouts that triggers the switch to {@link short}. + * Must be an integer ≥ 1. + */ + threshold: number; +} + +interface EndpointState { + tightened: boolean; + /** Consecutive timeouts since the last `ok`. */ + consecutiveTimeouts: number; +} + +/** + * Adaptive per-endpoint policy: after a configurable threshold of + * consecutive timeouts on the same endpoint, subsequent requests use a + * shorter budget so the pipeline fast-fails instead of waiting out the + * full default budget. A single successful request relaxes the endpoint + * back to the default budget. + * + * State is in-memory and tied to the policy instance — Pipeline creates a + * fresh instance per dataset so one offending dataset doesn’t poison the + * next. + * + * @example + * ```ts + * const factory = adaptiveTimeoutPolicy({ + * default: 300_000, + * short: 10_000, + * threshold: 2, + * }); + * ``` + */ +export class AdaptiveTimeoutPolicy implements TimeoutPolicy { + private readonly states = new Map(); + private readonly observers = new Set(); + + constructor(private readonly options: AdaptiveTimeoutPolicyOptions) { + if (!Number.isFinite(options.default) || options.default <= 0) { + throw new Error( + `AdaptiveTimeoutPolicy: \`default\` must be a positive finite number, received ${options.default}`, + ); + } + if (!Number.isFinite(options.short) || options.short <= 0) { + throw new Error( + `AdaptiveTimeoutPolicy: \`short\` must be a positive finite number, received ${options.short}`, + ); + } + if (!(options.short < options.default)) { + throw new Error( + `AdaptiveTimeoutPolicy: \`short\` (${options.short}) must be less than \`default\` (${options.default})`, + ); + } + if (!Number.isInteger(options.threshold) || options.threshold < 1) { + throw new Error( + `AdaptiveTimeoutPolicy: \`threshold\` must be an integer ≥ 1, received ${options.threshold}`, + ); + } + } + + beforeRequest(context: BeforeRequestContext): number { + const state = this.stateFor(context.endpoint); + return state.tightened ? this.options.short : this.options.default; + } + + afterRequest(context: AfterRequestContext): void { + const state = this.stateFor(context.endpoint); + if (context.outcome === 'ok') { + const wasTightened = state.tightened; + const priorCount = state.consecutiveTimeouts; + state.consecutiveTimeouts = 0; + state.tightened = false; + if (wasTightened) { + this.notify('relax', { + endpoint: context.endpoint, + fromTimeoutMs: this.options.short, + toTimeoutMs: this.options.default, + consecutiveTimeouts: priorCount, + }); + } + return; + } + if (context.outcome === 'timeout') { + state.consecutiveTimeouts += 1; + if ( + !state.tightened && + state.consecutiveTimeouts >= this.options.threshold + ) { + state.tightened = true; + this.notify('tighten', { + endpoint: context.endpoint, + fromTimeoutMs: this.options.default, + toTimeoutMs: this.options.short, + consecutiveTimeouts: state.consecutiveTimeouts, + }); + } + } + // 'error' is neutral. + } + + subscribe(observer: TimeoutPolicyObserver): () => void { + this.observers.add(observer); + return () => { + this.observers.delete(observer); + }; + } + + private stateFor(endpoint: URL): EndpointState { + const key = endpoint.toString(); + let state = this.states.get(key); + if (!state) { + state = { tightened: false, consecutiveTimeouts: 0 }; + this.states.set(key, state); + } + return state; + } + + private notify( + kind: 'tighten' | 'relax', + event: TimeoutTransitionEvent, + ): void { + for (const observer of this.observers) { + const handler = + kind === 'tighten' ? observer.onTighten : observer.onRelax; + handler?.(event); + } + } +} + +/** + * Factory returning a fresh {@link ConstantTimeoutPolicy} on every call. + * Pass this to {@link PipelineOptions.timeoutPolicy}. + */ +export function constantTimeoutPolicy( + timeoutMs: number, +): () => ConstantTimeoutPolicy { + // Validate eagerly so misconfiguration is caught at factory creation, + // not deferred until the first dataset boundary. + + new ConstantTimeoutPolicy(timeoutMs); + return () => new ConstantTimeoutPolicy(timeoutMs); +} + +/** + * Factory returning a fresh {@link AdaptiveTimeoutPolicy} on every call. + * Pass this to {@link PipelineOptions.timeoutPolicy}; the Pipeline invokes + * the factory once per dataset so state resets between datasets. + */ +export function adaptiveTimeoutPolicy( + options: AdaptiveTimeoutPolicyOptions, +): () => AdaptiveTimeoutPolicy { + // Validate eagerly (see {@link constantTimeoutPolicy}). + + new AdaptiveTimeoutPolicy(options); + return () => new AdaptiveTimeoutPolicy(options); +} diff --git a/packages/pipeline/src/stage.ts b/packages/pipeline/src/stage.ts index bf9401ec..131434d1 100644 --- a/packages/pipeline/src/stage.ts +++ b/packages/pipeline/src/stage.ts @@ -2,6 +2,7 @@ import { Dataset, Distribution } from '@lde/dataset'; import type { Quad } from '@rdfjs/types'; import type { Executor, VariableBindings } from './sparql/executor.js'; import { NotSupported } from './sparql/executor.js'; +import type { TimeoutPolicy } from './sparql/timeoutPolicy.js'; import { batch } from './batch.js'; import type { Validator } from './validator.js'; import type { Writer } from './writer/writer.js'; @@ -48,6 +49,19 @@ export interface StageOptions { export interface RunOptions { onProgress?: (itemsProcessed: number, quadsGenerated: number) => void; + /** + * Per-dataset {@link TimeoutPolicy} threaded through to executors and + * item selectors. The Pipeline owns lifecycle (factory invocation per + * dataset), so a single policy instance covers all stages and child + * stages within one dataset. + */ + timeoutPolicy?: TimeoutPolicy; +} + +/** Options accepted by {@link ItemSelector.select}. */ +export interface SelectOptions { + /** Per-call timeout policy. */ + timeoutPolicy?: TimeoutPolicy; } export class Stage { @@ -82,9 +96,12 @@ export class Stage { writer: Writer, options?: RunOptions, ): Promise { + const timeoutPolicy = options?.timeoutPolicy; if (this.itemSelector) { return this.runWithSelector( - this.itemSelector.select(distribution, this.batchSize), + this.itemSelector.select(distribution, this.batchSize, { + timeoutPolicy, + }), dataset, distribution, writer, @@ -92,7 +109,7 @@ export class Stage { ); } - const streams = await this.executeAll(dataset, distribution); + const streams = await this.executeAll(dataset, distribution, timeoutPolicy); if (streams instanceof NotSupported) { return streams; } @@ -206,11 +223,10 @@ export class Stage { // Run all executors for this batch in parallel. const executorOutputs = await Promise.all( this.executors.map(async (executor) => { - const result = await executor.execute( - dataset, - distribution, - { bindings }, - ); + const result = await executor.execute(dataset, distribution, { + bindings, + timeoutPolicy: options?.timeoutPolicy, + }); if (result instanceof NotSupported) return []; hasResults = true; const quads: Quad[] = []; @@ -312,9 +328,12 @@ export class Stage { private async executeAll( dataset: Dataset, distribution: Distribution, + timeoutPolicy: TimeoutPolicy | undefined, ): Promise[] | NotSupported> { const results = await Promise.all( - this.executors.map((executor) => executor.execute(dataset, distribution)), + this.executors.map((executor) => + executor.execute(dataset, distribution, { timeoutPolicy }), + ), ); const streams: AsyncIterable[] = []; @@ -345,5 +364,6 @@ export interface ItemSelector { select( distribution: Distribution, batchSize?: number, + options?: SelectOptions, ): AsyncIterable; } diff --git a/packages/pipeline/test/pipeline.test.ts b/packages/pipeline/test/pipeline.test.ts index 48746e34..34324fa5 100644 --- a/packages/pipeline/test/pipeline.test.ts +++ b/packages/pipeline/test/pipeline.test.ts @@ -96,6 +96,9 @@ function makeReporter(): RequiredReporter { datasetSkipped: vi.fn>(), pipelineComplete: vi.fn>(), + timeoutTightened: + vi.fn>(), + timeoutRelaxed: vi.fn>(), }; } @@ -1139,4 +1142,122 @@ describe('Pipeline', () => { await expect(pipeline.run()).resolves.toBeUndefined(); }); }); + + describe('timeoutPolicy', () => { + it('passes a TimeoutPolicy instance to each stage.run', async () => { + const stage = makeStage('stage1'); + + const pipeline = new Pipeline({ + datasetSelector: makeDatasetSelector(dataset), + stages: [stage], + writers: writer, + distributionResolver: makeResolver(makeResolvedDistribution()), + }); + + await pipeline.run(); + + const call = (stage.run as ReturnType).mock.calls[0]; + const options = call[3]; + expect(options.timeoutPolicy).toBeDefined(); + expect(typeof options.timeoutPolicy.beforeRequest).toBe('function'); + expect(typeof options.timeoutPolicy.afterRequest).toBe('function'); + }); + + it('invokes the factory once per dataset', async () => { + const factory = vi.fn().mockImplementation(() => ({ + beforeRequest: () => 300_000, + afterRequest: vi.fn(), + })); + + const datasetA = makeDataset('http://example.org/dataset-a'); + const datasetB = makeDataset('http://example.org/dataset-b'); + + const pipeline = new Pipeline({ + datasetSelector: makeDatasetSelector(datasetA, datasetB), + stages: [makeStage('stage1')], + writers: writer, + distributionResolver: makeResolver(makeResolvedDistribution()), + timeoutPolicy: factory, + }); + + await pipeline.run(); + + expect(factory).toHaveBeenCalledTimes(2); + }); + + it('does not share state between datasets', async () => { + const policies: unknown[] = []; + const factory = vi.fn().mockImplementation(() => { + const policy = { + beforeRequest: () => 300_000, + afterRequest: vi.fn(), + }; + policies.push(policy); + return policy; + }); + + const stage = makeStage('stage1'); + const datasetA = makeDataset('http://example.org/dataset-a'); + const datasetB = makeDataset('http://example.org/dataset-b'); + + const pipeline = new Pipeline({ + datasetSelector: makeDatasetSelector(datasetA, datasetB), + stages: [stage], + writers: writer, + distributionResolver: makeResolver(makeResolvedDistribution()), + timeoutPolicy: factory, + }); + + await pipeline.run(); + + const runCalls = (stage.run as ReturnType).mock.calls; + expect(runCalls[0][3].timeoutPolicy).toBe(policies[0]); + expect(runCalls[1][3].timeoutPolicy).toBe(policies[1]); + expect(policies[0]).not.toBe(policies[1]); + }); + + it('forwards onTighten/onRelax transitions to the reporter', async () => { + const tightenEvent = { + endpoint: new URL('http://example.org/sparql'), + fromTimeoutMs: 300_000, + toTimeoutMs: 10_000, + consecutiveTimeouts: 2, + }; + const relaxEvent = { + endpoint: new URL('http://example.org/sparql'), + fromTimeoutMs: 10_000, + toTimeoutMs: 300_000, + consecutiveTimeouts: 0, + }; + const factory = vi.fn().mockImplementation(() => ({ + beforeRequest: () => 300_000, + afterRequest: vi.fn(), + subscribe(observer: { + onTighten?: (event: unknown) => void; + onRelax?: (event: unknown) => void; + }) { + // Fire one of each transition synchronously after subscription + // so the test doesn't depend on stage timing. + observer.onTighten?.(tightenEvent); + observer.onRelax?.(relaxEvent); + return vi.fn(); + }, + })); + + const reporter = makeReporter(); + const pipeline = new Pipeline({ + datasetSelector: makeDatasetSelector(dataset), + stages: [makeStage('stage1')], + writers: writer, + distributionResolver: makeResolver(makeResolvedDistribution()), + timeoutPolicy: factory, + reporter, + }); + + await pipeline.run(); + + expect(reporter.timeoutTightened).toHaveBeenCalledWith(tightenEvent); + expect(reporter.timeoutRelaxed).toHaveBeenCalledWith(relaxEvent); + }); + }); }); diff --git a/packages/pipeline/test/sparql/executor.test.ts b/packages/pipeline/test/sparql/executor.test.ts index 02614ea9..da53edc8 100644 --- a/packages/pipeline/test/sparql/executor.test.ts +++ b/packages/pipeline/test/sparql/executor.test.ts @@ -627,6 +627,255 @@ describe('SparqlConstructExecutor', () => { expect(executor).toBeInstanceOf(SparqlConstructExecutor); }); }); + + describe('timeout policy', () => { + function recordingPolicy() { + const before = vi.fn().mockReturnValue(1234); + const after = vi.fn(); + return { + beforeRequest: before, + afterRequest: after, + }; + } + + function makeDistribution(url = 'http://policy.example.org/sparql') { + return Distribution.sparql(new URL(url)); + } + + function makeDataset(distribution: Distribution) { + return new Dataset({ + iri: new URL('http://example.org/dataset'), + distributions: [distribution], + }); + } + + it('calls beforeRequest and afterRequest({outcome: "ok"}) on success', async () => { + const fetcher = new SparqlEndpointFetcher(); + vi.spyOn(fetcher, 'fetchTriples').mockResolvedValue([] as never); + + const policy = recordingPolicy(); + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + }); + + const distribution = makeDistribution(); + await executor.execute(makeDataset(distribution), distribution, { + timeoutPolicy: policy, + }); + + expect(policy.beforeRequest).toHaveBeenCalledTimes(1); + expect(policy.beforeRequest).toHaveBeenCalledWith({ + endpoint: distribution.accessUrl, + }); + expect(policy.afterRequest).toHaveBeenCalledTimes(1); + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ + endpoint: distribution.accessUrl, + outcome: 'ok', + }), + ); + }); + + it('classifies HTTP 504 as outcome "timeout"', async () => { + const fetcher = new SparqlEndpointFetcher(); + vi.spyOn(fetcher, 'fetchTriples').mockRejectedValue( + new Error( + 'Invalid SPARQL endpoint response from http://policy.example.org/sparql (HTTP status 504):\nGateway Timeout', + ), + ); + + const policy = recordingPolicy(); + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + retries: 0, + }); + const distribution = makeDistribution(); + + await expect( + executor.execute(makeDataset(distribution), distribution, { + timeoutPolicy: policy, + }), + ).rejects.toThrow('504'); + + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'timeout' }), + ); + }); + + it('classifies AbortError as outcome "timeout"', async () => { + const fetcher = new SparqlEndpointFetcher(); + const abortError = new Error('The operation was aborted'); + abortError.name = 'AbortError'; + vi.spyOn(fetcher, 'fetchTriples').mockRejectedValue(abortError); + + const policy = recordingPolicy(); + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + retries: 0, + }); + const distribution = makeDistribution(); + + await expect( + executor.execute(makeDataset(distribution), distribution, { + timeoutPolicy: policy, + }), + ).rejects.toThrow(); + + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'timeout' }), + ); + }); + + it('classifies TimeoutError (AbortSignal.timeout) as outcome "timeout"', async () => { + const fetcher = new SparqlEndpointFetcher(); + const timeoutError = new Error('The operation timed out'); + timeoutError.name = 'TimeoutError'; + vi.spyOn(fetcher, 'fetchTriples').mockRejectedValue(timeoutError); + + const policy = recordingPolicy(); + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + retries: 0, + }); + const distribution = makeDistribution(); + + await expect( + executor.execute(makeDataset(distribution), distribution, { + timeoutPolicy: policy, + }), + ).rejects.toThrow(); + + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'timeout' }), + ); + }); + + it('classifies HTTP 400 as outcome "error" (neutral)', async () => { + const fetcher = new SparqlEndpointFetcher(); + vi.spyOn(fetcher, 'fetchTriples').mockRejectedValue( + new Error( + 'Invalid SPARQL endpoint response from http://policy.example.org/sparql (HTTP status 400):\nBad Request', + ), + ); + + const policy = recordingPolicy(); + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + retries: 0, + }); + const distribution = makeDistribution(); + + await expect( + executor.execute(makeDataset(distribution), distribution, { + timeoutPolicy: policy, + }), + ).rejects.toThrow(); + + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'error' }), + ); + }); + + it('invokes the policy per attempt inside pRetry', async () => { + const fetcher = new SparqlEndpointFetcher(); + vi.spyOn(fetcher, 'fetchTriples') + .mockRejectedValueOnce( + new Error( + 'Invalid SPARQL endpoint response from http://policy.example.org/sparql (HTTP status 504):\nGateway Timeout', + ), + ) + .mockResolvedValueOnce([] as never); + + const policy = recordingPolicy(); + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + }); + const distribution = makeDistribution(); + + await executor.execute(makeDataset(distribution), distribution, { + timeoutPolicy: policy, + }); + + expect(policy.beforeRequest).toHaveBeenCalledTimes(2); + expect(policy.afterRequest).toHaveBeenCalledTimes(2); + expect(policy.afterRequest).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ outcome: 'timeout' }), + ); + expect(policy.afterRequest).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ outcome: 'ok' }), + ); + }); + + it('falls back to the executor-level policy when ExecuteOptions omits one', async () => { + const fetcher = new SparqlEndpointFetcher(); + vi.spyOn(fetcher, 'fetchTriples').mockResolvedValue([] as never); + + const policy = recordingPolicy(); + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + timeoutPolicy: policy, + }); + const distribution = makeDistribution(); + + await executor.execute(makeDataset(distribution), distribution); + + expect(policy.beforeRequest).toHaveBeenCalledTimes(1); + expect(policy.afterRequest).toHaveBeenCalledTimes(1); + }); + + it('aborts the underlying fetch when the policy budget elapses', async () => { + const slowFetch = vi + .fn< + (input: Request | string, init?: RequestInit) => Promise + >() + .mockImplementation((_input, init) => { + return new Promise((_resolve, reject) => { + init?.signal?.addEventListener('abort', () => { + const error = new Error('aborted'); + error.name = 'AbortError'; + reject(error); + }); + }); + }); + const fetcher = new SparqlEndpointFetcher({ + fetch: slowFetch, + timeout: 10, + }); + + const policy = { + beforeRequest: vi.fn().mockReturnValue(10), + afterRequest: vi.fn(), + }; + const executor = new SparqlConstructExecutor({ + query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', + fetcher, + retries: 0, + }); + const distribution = makeDistribution(); + + await expect( + executor.execute(makeDataset(distribution), distribution, { + timeoutPolicy: policy, + }), + ).rejects.toThrow(); + + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'timeout' }), + ); + expect(slowFetch).toHaveBeenCalled(); + const init = slowFetch.mock.calls[0][1]; + expect(init?.signal).toBeInstanceOf(AbortSignal); + }); + }); }); describe('LineBufferTransform', () => { @@ -640,7 +889,9 @@ describe('LineBufferTransform', () => { it('passes complete lines through', async () => { const transform = new LineBufferTransform(); - const input = Readable.from(['

"hello"@nl .\n

"world"@en .\n']); + const input = Readable.from([ + '

"hello"@nl .\n

"world"@en .\n', + ]); const result = await collect(input.pipe(transform)); expect(result).toBe('

"hello"@nl .\n

"world"@en .\n'); }); @@ -648,10 +899,7 @@ describe('LineBufferTransform', () => { it('buffers a line split across chunks', async () => { const transform = new LineBufferTransform(); // Simulate a language tag split across chunk boundaries - const input = Readable.from([ - '

"hallo"@', - 'nl-nl .\n', - ]); + const input = Readable.from(['

"hallo"@', 'nl-nl .\n']); const result = await collect(input.pipe(transform)); expect(result).toBe('

"hallo"@nl-nl .\n'); }); diff --git a/packages/pipeline/test/sparql/selector.test.ts b/packages/pipeline/test/sparql/selector.test.ts index f2707d4c..558fa0af 100644 --- a/packages/pipeline/test/sparql/selector.test.ts +++ b/packages/pipeline/test/sparql/selector.test.ts @@ -501,4 +501,134 @@ describe('SparqlItemSelector', () => { expect(mockFetcher.fetchBindings).not.toHaveBeenCalled(); }); }); + + describe('timeout policy', () => { + function recordingPolicy() { + return { + beforeRequest: vi.fn().mockReturnValue(5_000), + afterRequest: vi.fn(), + }; + } + + it('calls beforeRequest and afterRequest({outcome: "ok"}) per page', async () => { + const mockFetcher = { + fetchBindings: vi + .fn() + .mockImplementationOnce(() => + bindingsStream([ + { uri: namedNode('http://example.com/1') }, + { uri: namedNode('http://example.com/2') }, + ]), + ) + .mockImplementationOnce(() => + bindingsStream([{ uri: namedNode('http://example.com/3') }]), + ), + }; + + const policy = recordingPolicy(); + const selector = new SparqlItemSelector({ + query, + fetcher: mockFetcher as never, + }); + + const rows: VariableBindings[] = []; + for await (const row of selector.select(distribution, 2, { + timeoutPolicy: policy, + })) { + rows.push(row); + } + + expect(rows).toHaveLength(3); + expect(policy.beforeRequest).toHaveBeenCalledTimes(2); + expect(policy.beforeRequest).toHaveBeenCalledWith({ + endpoint: distribution.accessUrl, + }); + expect(policy.afterRequest).toHaveBeenCalledTimes(2); + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'ok' }), + ); + }); + + it('reports HTTP 504 as outcome "timeout"', async () => { + const mockFetcher = { + fetchBindings: vi + .fn() + .mockRejectedValue( + new Error( + 'Invalid SPARQL endpoint response from http://example.com/sparql (HTTP status 504):\nGateway Timeout', + ), + ), + }; + + const policy = recordingPolicy(); + const selector = new SparqlItemSelector({ + query, + fetcher: mockFetcher as never, + }); + + const iterate = async () => { + for await (const _row of selector.select(distribution, 10, { + timeoutPolicy: policy, + })) { + // consume + } + }; + + await expect(iterate()).rejects.toThrow('504'); + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'timeout' }), + ); + }); + + it('reports non-timeout errors as outcome "error"', async () => { + const mockFetcher = { + fetchBindings: vi + .fn() + .mockRejectedValue( + new Error( + 'Invalid SPARQL endpoint response from http://example.com/sparql (HTTP status 400):\nBad Request', + ), + ), + }; + + const policy = recordingPolicy(); + const selector = new SparqlItemSelector({ + query, + fetcher: mockFetcher as never, + }); + + const iterate = async () => { + for await (const _row of selector.select(distribution, 10, { + timeoutPolicy: policy, + })) { + // consume + } + }; + + await expect(iterate()).rejects.toThrow(); + expect(policy.afterRequest).toHaveBeenCalledWith( + expect.objectContaining({ outcome: 'error' }), + ); + }); + + it('falls back to the selector-level policy when select() omits one', async () => { + const mockFetcher = { + fetchBindings: vi.fn().mockImplementation(() => bindingsStream([])), + }; + + const policy = recordingPolicy(); + const selector = new SparqlItemSelector({ + query, + fetcher: mockFetcher as never, + timeoutPolicy: policy, + }); + + for await (const _row of selector.select(distribution, 10)) { + // consume + } + + expect(policy.beforeRequest).toHaveBeenCalledTimes(1); + expect(policy.afterRequest).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/packages/pipeline/test/sparql/timeoutPolicy.test.ts b/packages/pipeline/test/sparql/timeoutPolicy.test.ts new file mode 100644 index 00000000..b658f899 --- /dev/null +++ b/packages/pipeline/test/sparql/timeoutPolicy.test.ts @@ -0,0 +1,421 @@ +import { describe, it, expect, vi } from 'vitest'; +import { + AdaptiveTimeoutPolicy, + ConstantTimeoutPolicy, + adaptiveTimeoutPolicy, + constantTimeoutPolicy, +} from '../../src/sparql/timeoutPolicy.js'; + +const endpointA = new URL('https://example.org/a/sparql'); +const endpointB = new URL('https://example.org/b/sparql'); + +describe('ConstantTimeoutPolicy', () => { + it('returns the configured timeout from beforeRequest', () => { + const policy = new ConstantTimeoutPolicy(1234); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1234); + }); + + it('ignores outcomes in afterRequest', () => { + const policy = new ConstantTimeoutPolicy(1000); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 500, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + }); + + it('rejects non-positive timeouts', () => { + expect(() => new ConstantTimeoutPolicy(0)).toThrow(); + expect(() => new ConstantTimeoutPolicy(-1)).toThrow(); + expect(() => new ConstantTimeoutPolicy(Number.NaN)).toThrow(); + }); +}); + +describe('constantTimeoutPolicy factory', () => { + it('returns a factory that builds independent ConstantTimeoutPolicy instances', () => { + const factory = constantTimeoutPolicy(5_000); + const a = factory(); + const b = factory(); + expect(a).toBeInstanceOf(ConstantTimeoutPolicy); + expect(a).not.toBe(b); + expect(a.beforeRequest({ endpoint: endpointA })).toBe(5_000); + }); +}); + +describe('AdaptiveTimeoutPolicy', () => { + describe('construction-time validation', () => { + it('throws when `short` >= `default`', () => { + expect( + () => + new AdaptiveTimeoutPolicy({ + default: 1000, + short: 1000, + threshold: 2, + }), + ).toThrow(); + expect( + () => + new AdaptiveTimeoutPolicy({ + default: 1000, + short: 2000, + threshold: 2, + }), + ).toThrow(); + }); + + it('throws when `threshold` < 1', () => { + expect( + () => + new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 0, + }), + ).toThrow(); + expect( + () => + new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: -1, + }), + ).toThrow(); + }); + + it('throws when timeouts are non-positive', () => { + expect( + () => + new AdaptiveTimeoutPolicy({ + default: 0, + short: -1, + threshold: 1, + }), + ).toThrow(); + }); + }); + + describe('state machine', () => { + it('returns default before any events', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 2, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + }); + + it('tightens after exactly threshold=1 consecutive timeouts', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + }); + + it('tightens after exactly threshold=2 consecutive timeouts', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 2, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + }); + + it('tightens after exactly threshold=3 consecutive timeouts', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 3, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + }); + + it('stays tightened on further timeouts', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + for (let i = 0; i < 5; i++) { + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + } + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + }); + + it('relaxes to default on a single ok', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'ok', + durationMs: 80, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + }); + + it('resets the counter on ok so subsequent timeouts must accumulate again', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 2, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'ok', + durationMs: 80, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + }); + + it('treats `error` outcomes as neutral (neither tighten nor relax)', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'error', + durationMs: 50, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + // An `error` while tightened keeps state tightened. + policy.afterRequest({ + endpoint: endpointA, + outcome: 'error', + durationMs: 50, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + }); + + it('isolates state per endpoint', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); + expect(policy.beforeRequest({ endpoint: endpointB })).toBe(1000); + }); + }); + + describe('transition events', () => { + it('emits onTighten when state flips to tightened', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 2, + }); + const onTighten = vi.fn(); + const onRelax = vi.fn(); + policy.subscribe({ onTighten, onRelax }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(onTighten).not.toHaveBeenCalled(); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(onTighten).toHaveBeenCalledTimes(1); + expect(onTighten).toHaveBeenCalledWith( + expect.objectContaining({ + endpoint: endpointA, + consecutiveTimeouts: 2, + fromTimeoutMs: 1000, + toTimeoutMs: 100, + }), + ); + expect(onRelax).not.toHaveBeenCalled(); + }); + + it('does not re-emit onTighten while already tightened', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + const onTighten = vi.fn(); + policy.subscribe({ onTighten }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(onTighten).toHaveBeenCalledTimes(1); + }); + + it('emits onRelax when an ok arrives in tightened state', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + const onRelax = vi.fn(); + policy.subscribe({ onRelax }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'ok', + durationMs: 80, + }); + expect(onRelax).toHaveBeenCalledTimes(1); + expect(onRelax).toHaveBeenCalledWith( + expect.objectContaining({ + endpoint: endpointA, + fromTimeoutMs: 100, + toTimeoutMs: 1000, + }), + ); + }); + + it('does not emit onRelax when ok arrives in healthy state', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + const onRelax = vi.fn(); + policy.subscribe({ onRelax }); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'ok', + durationMs: 80, + }); + expect(onRelax).not.toHaveBeenCalled(); + }); + + it('unsubscribe stops further notifications', () => { + const policy = new AdaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + const onTighten = vi.fn(); + const unsubscribe = policy.subscribe({ onTighten }); + unsubscribe(); + policy.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + expect(onTighten).not.toHaveBeenCalled(); + }); + }); +}); + +describe('adaptiveTimeoutPolicy factory', () => { + it('returns a factory that builds independent AdaptiveTimeoutPolicy instances', () => { + const factory = adaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 2, + }); + const a = factory(); + const b = factory(); + expect(a).toBeInstanceOf(AdaptiveTimeoutPolicy); + expect(a).not.toBe(b); + }); + + it('isolates state across factory invocations', () => { + const factory = adaptiveTimeoutPolicy({ + default: 1000, + short: 100, + threshold: 1, + }); + const a = factory(); + a.afterRequest({ + endpoint: endpointA, + outcome: 'timeout', + durationMs: 1000, + }); + const b = factory(); + expect(b.beforeRequest({ endpoint: endpointA })).toBe(1000); + }); +}); diff --git a/packages/pipeline/test/stage.test.ts b/packages/pipeline/test/stage.test.ts index 03d7efa0..60b4ce0f 100644 --- a/packages/pipeline/test/stage.test.ts +++ b/packages/pipeline/test/stage.test.ts @@ -190,9 +190,11 @@ describe('Stage', () => { expect(result).not.toBeInstanceOf(NotSupported); expect(executor.execute).toHaveBeenCalledOnce(); - expect(executor.execute).toHaveBeenCalledWith(dataset, distribution, { - bindings, - }); + expect(executor.execute).toHaveBeenCalledWith( + dataset, + distribution, + expect.objectContaining({ bindings }), + ); }); it('batches bindings across multiple executor calls', async () => { @@ -215,12 +217,18 @@ describe('Stage', () => { expect(result).not.toBeInstanceOf(NotSupported); expect(executor.execute).toHaveBeenCalledTimes(2); - expect(executor.execute).toHaveBeenNthCalledWith(1, dataset, distribution, { - bindings: [bindings[0], bindings[1]], - }); - expect(executor.execute).toHaveBeenNthCalledWith(2, dataset, distribution, { - bindings: [bindings[2]], - }); + expect(executor.execute).toHaveBeenNthCalledWith( + 1, + dataset, + distribution, + expect.objectContaining({ bindings: [bindings[0], bindings[1]] }), + ); + expect(executor.execute).toHaveBeenNthCalledWith( + 2, + dataset, + distribution, + expect.objectContaining({ bindings: [bindings[2]] }), + ); }); it('uses custom batchSize', async () => { @@ -271,7 +279,11 @@ describe('Stage', () => { const result = await stage.run(dataset, distribution, writer); expect(result).not.toBeInstanceOf(NotSupported); expect(writer.quads).toEqual([q1, q2]); - expect(executor.execute).toHaveBeenCalledWith(dataset, distribution); + expect(executor.execute).toHaveBeenCalledWith( + dataset, + distribution, + expect.objectContaining({}), + ); }); it('forwards distribution to executors', async () => { @@ -292,6 +304,7 @@ describe('Stage', () => { expect(executor.execute).toHaveBeenCalledWith( dataset, namedGraphDistribution, + expect.objectContaining({}), ); }); @@ -312,10 +325,16 @@ describe('Stage', () => { const result = await stage.run(dataset, distribution, writer); expect(result).not.toBeInstanceOf(NotSupported); - expect(selectFn).toHaveBeenCalledWith(distribution, 10); - expect(executor.execute).toHaveBeenCalledWith(dataset, distribution, { - bindings, - }); + expect(selectFn).toHaveBeenCalledWith( + distribution, + 10, + expect.objectContaining({}), + ); + expect(executor.execute).toHaveBeenCalledWith( + dataset, + distribution, + expect.objectContaining({ bindings }), + ); }); it('passes batchSize to item selector', async () => { @@ -334,7 +353,11 @@ describe('Stage', () => { const writer = collectingWriter(); await stage.run(dataset, distribution, writer); - expect(selectFn).toHaveBeenCalledWith(distribution, 500); + expect(selectFn).toHaveBeenCalledWith( + distribution, + 500, + expect.objectContaining({}), + ); }); describe('sub-stages', () => { diff --git a/packages/pipeline/vite.config.ts b/packages/pipeline/vite.config.ts index 9b0b8cac..afc94101 100644 --- a/packages/pipeline/vite.config.ts +++ b/packages/pipeline/vite.config.ts @@ -11,10 +11,10 @@ export default mergeConfig( coverage: { thresholds: { autoUpdate: true, - functions: 93.38, - lines: 93.46, - branches: 89.64, - statements: 92.89, + functions: 94.3, + lines: 93.67, + branches: 89.07, + statements: 93.19, }, }, }, From ecab51d03d5f630ece771fe449610cec88d70d37 Mon Sep 17 00:00:00 2001 From: David de Boer Date: Thu, 28 May 2026 14:30:17 +0200 Subject: [PATCH 2/6] docs: drop ADR 0003 Issue #419 already captures the rationale and decisions; the ADR duplicated rather than added context. --- ...3-adaptive-per-endpoint-sparql-timeouts.md | 50 ------------------- 1 file changed, 50 deletions(-) delete mode 100644 docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md diff --git a/docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md b/docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md deleted file mode 100644 index 17b58fc2..00000000 --- a/docs/decisions/0003-adaptive-per-endpoint-sparql-timeouts.md +++ /dev/null @@ -1,50 +0,0 @@ -# 3. Adopt adaptive per-endpoint SPARQL timeouts - -Date: 2026-05-28 - -## Status - -Accepted - -## Context - -The DKG pipeline analyses ~500 third-party SPARQL endpoints. Some endpoints (notably TriplyDB-hosted ones) serve light queries quickly but reliably time out on heavy analytical queries, returning HTTP 504 after their own internal query budget elapses. With a fixed 5-minute client-side timeout, a single offending dataset can spend ~80 minutes cycling through stage-level timeouts before the pipeline moves on. - -We need the pipeline to learn from “this endpoint just timed out twice” and apply a tighter budget to subsequent requests against the same endpoint, so light queries still succeed while heavy queries fast-fail. - -## Decision - -`SparqlConstructExecutor` and `SparqlItemSelector` accept a per-call `TimeoutPolicy` injected by the `Pipeline` at dataset boundaries. Two built-in policies ship: - -- `ConstantTimeoutPolicy` – returns the same budget for every request. Used as the implicit default (`constantTimeoutPolicy(300_000)`) when `PipelineOptions.timeoutPolicy` is omitted, so existing call sites see no behavioural change. -- `AdaptiveTimeoutPolicy` – tracks consecutive timeouts per endpoint within a dataset. After `threshold` consecutive timeouts, subsequent requests against that endpoint use the short budget; a single successful request relaxes back to the default budget. Construction validates `short < default` and `threshold ≥ 1`. - -The `TimeoutPolicy` interface is intentionally narrow: - -```ts -interface TimeoutPolicy { - beforeRequest(context: { endpoint: URL }): number; - afterRequest(context: { - endpoint: URL; - outcome: 'ok' | 'error' | 'timeout'; - durationMs: number; - error?: unknown; - }): void; - subscribe?(observer: TimeoutPolicyObserver): () => void; -} -``` - -Key decisions: - -- **Per-dataset scope.** `PipelineOptions.timeoutPolicy` accepts a `() => TimeoutPolicy` factory. The pipeline invokes it once per dataset so one bad dataset doesn’t poison the next. -- **Per-attempt hooks.** Policy hooks fire inside the `pRetry` callback, not around it, so a retried timeout already runs with the tightened budget. -- **Outcome classification.** HTTP 504 from upstream counts as a `timeout`, alongside `AbortError` / `TimeoutError` from our own `AbortSignal.timeout()`. All other errors are neutral (`error`). -- **Breaking change.** `SparqlConstructExecutorOptions.timeout: number` is removed and replaced by `timeoutPolicy?: TimeoutPolicy`. Pre-release per `AGENTS.md`, so the cleaner API is preferred over a permanent `number | TimeoutPolicy` union. Call sites passing `timeout: 5000` migrate to `timeoutPolicy: new ConstantTimeoutPolicy(5_000)`. -- **Observability.** `ProgressReporter` gains optional `timeoutTightened` / `timeoutRelaxed` hooks. The `Pipeline` subscribes to the policy at each dataset boundary and forwards transitions. `ConsoleReporter` prints `↘ Tightened` / `↗ Relaxed` lines so operators can distinguish a fast-failed stage from an unexpected speedup. -- **No off-the-shelf library.** Circuit breakers (`cockatiel`, `opossum`) implement deny semantics; we want to keep serving requests with a shorter budget. The homegrown ~50-line state machine is the right fit; the interface is stable enough to swap in a fuller resilience framework later if other requirements emerge. - -## Consequences - -- DKG can opt into `adaptiveTimeoutPolicy({ default: 300_000, short: 10_000, threshold: 2 })` once `@lde/pipeline` is released. Expected effect on the razu.nl case: worst-case wall-clock per troublesome dataset drops from ~80 min to ~15 min, with the same partial output preserved. -- Integrators implementing a custom `TimeoutPolicy` can plug in shared state across datasets by closing over it in the factory. -- `Executor` and `ItemSelector` implementations that thread `ExecuteOptions` / `SelectOptions` through to inner SPARQL calls require no code changes; ones that ignore the options pay only the cost of not benefiting from adaptive behaviour. From 839065a94d51d5a296f70c8ef8a7f728408b9f72 Mon Sep 17 00:00:00 2001 From: David de Boer Date: Thu, 28 May 2026 14:34:22 +0200 Subject: [PATCH 3/6] refactor(pipeline): rename AdaptiveTimeoutPolicy options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - default → defaultMs - short → tightenedMs - tightenAfterTimeouts replaces threshold The new names carry explicit units and echo the tighten/relax vocabulary used by the state machine, the transition events, and the console output. --- packages/pipeline/README.md | 6 +- packages/pipeline/src/sparql/timeoutPolicy.ts | 55 +++---- .../test/sparql/timeoutPolicy.test.ts | 136 +++++++++--------- 3 files changed, 100 insertions(+), 97 deletions(-) diff --git a/packages/pipeline/README.md b/packages/pipeline/README.md index 554681ca..2b7de9bd 100644 --- a/packages/pipeline/README.md +++ b/packages/pipeline/README.md @@ -218,9 +218,9 @@ import { adaptiveTimeoutPolicy } from '@lde/pipeline'; new Pipeline({ // … timeoutPolicy: adaptiveTimeoutPolicy({ - default: 300_000, // 5 min while the endpoint is healthy - short: 10_000, // 10 s once it’s flipped to tightened - threshold: 2, // flip after 2 consecutive timeouts + defaultMs: 300_000, // 5 min while the endpoint is healthy + tightenedMs: 10_000, // 10 s once the endpoint is flipped to tightened + tightenAfterTimeouts: 2, // flip after 2 consecutive timeouts }), }); ``` diff --git a/packages/pipeline/src/sparql/timeoutPolicy.ts b/packages/pipeline/src/sparql/timeoutPolicy.ts index 2519deba..1dc9686d 100644 --- a/packages/pipeline/src/sparql/timeoutPolicy.ts +++ b/packages/pipeline/src/sparql/timeoutPolicy.ts @@ -113,17 +113,17 @@ export class ConstantTimeoutPolicy implements TimeoutPolicy { /** Options for {@link AdaptiveTimeoutPolicy}. */ export interface AdaptiveTimeoutPolicyOptions { /** Budget applied while the endpoint is healthy. Must be positive. */ - default: number; + defaultMs: number; /** - * Budget applied after {@link threshold} consecutive timeouts. - * Must satisfy `short < default`. + * Budget applied after {@link tightenAfterTimeouts} consecutive timeouts. + * Must satisfy `tightenedMs < defaultMs`. */ - short: number; + tightenedMs: number; /** - * Number of consecutive timeouts that triggers the switch to {@link short}. - * Must be an integer ≥ 1. + * Number of consecutive timeouts that flips an endpoint to the + * {@link tightenedMs} budget. Must be an integer ≥ 1. */ - threshold: number; + tightenAfterTimeouts: number; } interface EndpointState { @@ -146,9 +146,9 @@ interface EndpointState { * @example * ```ts * const factory = adaptiveTimeoutPolicy({ - * default: 300_000, - * short: 10_000, - * threshold: 2, + * defaultMs: 300_000, + * tightenedMs: 10_000, + * tightenAfterTimeouts: 2, * }); * ``` */ @@ -157,31 +157,34 @@ export class AdaptiveTimeoutPolicy implements TimeoutPolicy { private readonly observers = new Set(); constructor(private readonly options: AdaptiveTimeoutPolicyOptions) { - if (!Number.isFinite(options.default) || options.default <= 0) { + if (!Number.isFinite(options.defaultMs) || options.defaultMs <= 0) { throw new Error( - `AdaptiveTimeoutPolicy: \`default\` must be a positive finite number, received ${options.default}`, + `AdaptiveTimeoutPolicy: \`defaultMs\` must be a positive finite number, received ${options.defaultMs}`, ); } - if (!Number.isFinite(options.short) || options.short <= 0) { + if (!Number.isFinite(options.tightenedMs) || options.tightenedMs <= 0) { throw new Error( - `AdaptiveTimeoutPolicy: \`short\` must be a positive finite number, received ${options.short}`, + `AdaptiveTimeoutPolicy: \`tightenedMs\` must be a positive finite number, received ${options.tightenedMs}`, ); } - if (!(options.short < options.default)) { + if (!(options.tightenedMs < options.defaultMs)) { throw new Error( - `AdaptiveTimeoutPolicy: \`short\` (${options.short}) must be less than \`default\` (${options.default})`, + `AdaptiveTimeoutPolicy: \`tightenedMs\` (${options.tightenedMs}) must be less than \`defaultMs\` (${options.defaultMs})`, ); } - if (!Number.isInteger(options.threshold) || options.threshold < 1) { + if ( + !Number.isInteger(options.tightenAfterTimeouts) || + options.tightenAfterTimeouts < 1 + ) { throw new Error( - `AdaptiveTimeoutPolicy: \`threshold\` must be an integer ≥ 1, received ${options.threshold}`, + `AdaptiveTimeoutPolicy: \`tightenAfterTimeouts\` must be an integer ≥ 1, received ${options.tightenAfterTimeouts}`, ); } } beforeRequest(context: BeforeRequestContext): number { const state = this.stateFor(context.endpoint); - return state.tightened ? this.options.short : this.options.default; + return state.tightened ? this.options.tightenedMs : this.options.defaultMs; } afterRequest(context: AfterRequestContext): void { @@ -194,8 +197,8 @@ export class AdaptiveTimeoutPolicy implements TimeoutPolicy { if (wasTightened) { this.notify('relax', { endpoint: context.endpoint, - fromTimeoutMs: this.options.short, - toTimeoutMs: this.options.default, + fromTimeoutMs: this.options.tightenedMs, + toTimeoutMs: this.options.defaultMs, consecutiveTimeouts: priorCount, }); } @@ -205,13 +208,13 @@ export class AdaptiveTimeoutPolicy implements TimeoutPolicy { state.consecutiveTimeouts += 1; if ( !state.tightened && - state.consecutiveTimeouts >= this.options.threshold + state.consecutiveTimeouts >= this.options.tightenAfterTimeouts ) { state.tightened = true; this.notify('tighten', { endpoint: context.endpoint, - fromTimeoutMs: this.options.default, - toTimeoutMs: this.options.short, + fromTimeoutMs: this.options.defaultMs, + toTimeoutMs: this.options.tightenedMs, consecutiveTimeouts: state.consecutiveTimeouts, }); } @@ -257,7 +260,7 @@ export function constantTimeoutPolicy( ): () => ConstantTimeoutPolicy { // Validate eagerly so misconfiguration is caught at factory creation, // not deferred until the first dataset boundary. - + new ConstantTimeoutPolicy(timeoutMs); return () => new ConstantTimeoutPolicy(timeoutMs); } @@ -271,7 +274,7 @@ export function adaptiveTimeoutPolicy( options: AdaptiveTimeoutPolicyOptions, ): () => AdaptiveTimeoutPolicy { // Validate eagerly (see {@link constantTimeoutPolicy}). - + new AdaptiveTimeoutPolicy(options); return () => new AdaptiveTimeoutPolicy(options); } diff --git a/packages/pipeline/test/sparql/timeoutPolicy.test.ts b/packages/pipeline/test/sparql/timeoutPolicy.test.ts index b658f899..075c19d8 100644 --- a/packages/pipeline/test/sparql/timeoutPolicy.test.ts +++ b/packages/pipeline/test/sparql/timeoutPolicy.test.ts @@ -45,40 +45,40 @@ describe('constantTimeoutPolicy factory', () => { describe('AdaptiveTimeoutPolicy', () => { describe('construction-time validation', () => { - it('throws when `short` >= `default`', () => { + it('throws when `tightenedMs` >= `defaultMs`', () => { expect( () => new AdaptiveTimeoutPolicy({ - default: 1000, - short: 1000, - threshold: 2, + defaultMs: 1000, + tightenedMs: 1000, + tightenAfterTimeouts: 2, }), ).toThrow(); expect( () => new AdaptiveTimeoutPolicy({ - default: 1000, - short: 2000, - threshold: 2, + defaultMs: 1000, + tightenedMs: 2000, + tightenAfterTimeouts: 2, }), ).toThrow(); }); - it('throws when `threshold` < 1', () => { + it('throws when `tightenAfterTimeouts` < 1', () => { expect( () => new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 0, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 0, }), ).toThrow(); expect( () => new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: -1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: -1, }), ).toThrow(); }); @@ -87,9 +87,9 @@ describe('AdaptiveTimeoutPolicy', () => { expect( () => new AdaptiveTimeoutPolicy({ - default: 0, - short: -1, - threshold: 1, + defaultMs: 0, + tightenedMs: -1, + tightenAfterTimeouts: 1, }), ).toThrow(); }); @@ -98,18 +98,18 @@ describe('AdaptiveTimeoutPolicy', () => { describe('state machine', () => { it('returns default before any events', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 2, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 2, }); expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); }); - it('tightens after exactly threshold=1 consecutive timeouts', () => { + it('tightens after exactly tightenAfterTimeouts=1 consecutive timeouts', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); expect(policy.beforeRequest({ endpoint: endpointA })).toBe(1000); policy.afterRequest({ @@ -120,11 +120,11 @@ describe('AdaptiveTimeoutPolicy', () => { expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); }); - it('tightens after exactly threshold=2 consecutive timeouts', () => { + it('tightens after exactly tightenAfterTimeouts=2 consecutive timeouts', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 2, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 2, }); policy.afterRequest({ endpoint: endpointA, @@ -140,11 +140,11 @@ describe('AdaptiveTimeoutPolicy', () => { expect(policy.beforeRequest({ endpoint: endpointA })).toBe(100); }); - it('tightens after exactly threshold=3 consecutive timeouts', () => { + it('tightens after exactly tightenAfterTimeouts=3 consecutive timeouts', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 3, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 3, }); policy.afterRequest({ endpoint: endpointA, @@ -167,9 +167,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('stays tightened on further timeouts', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); for (let i = 0; i < 5; i++) { policy.afterRequest({ @@ -183,9 +183,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('relaxes to default on a single ok', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); policy.afterRequest({ endpoint: endpointA, @@ -203,9 +203,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('resets the counter on ok so subsequent timeouts must accumulate again', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 2, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 2, }); policy.afterRequest({ endpoint: endpointA, @@ -227,9 +227,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('treats `error` outcomes as neutral (neither tighten nor relax)', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); policy.afterRequest({ endpoint: endpointA, @@ -254,9 +254,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('isolates state per endpoint', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); policy.afterRequest({ endpoint: endpointA, @@ -271,9 +271,9 @@ describe('AdaptiveTimeoutPolicy', () => { describe('transition events', () => { it('emits onTighten when state flips to tightened', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 2, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 2, }); const onTighten = vi.fn(); const onRelax = vi.fn(); @@ -303,9 +303,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('does not re-emit onTighten while already tightened', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); const onTighten = vi.fn(); policy.subscribe({ onTighten }); @@ -329,9 +329,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('emits onRelax when an ok arrives in tightened state', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); const onRelax = vi.fn(); policy.subscribe({ onRelax }); @@ -357,9 +357,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('does not emit onRelax when ok arrives in healthy state', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); const onRelax = vi.fn(); policy.subscribe({ onRelax }); @@ -373,9 +373,9 @@ describe('AdaptiveTimeoutPolicy', () => { it('unsubscribe stops further notifications', () => { const policy = new AdaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); const onTighten = vi.fn(); const unsubscribe = policy.subscribe({ onTighten }); @@ -393,9 +393,9 @@ describe('AdaptiveTimeoutPolicy', () => { describe('adaptiveTimeoutPolicy factory', () => { it('returns a factory that builds independent AdaptiveTimeoutPolicy instances', () => { const factory = adaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 2, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 2, }); const a = factory(); const b = factory(); @@ -405,9 +405,9 @@ describe('adaptiveTimeoutPolicy factory', () => { it('isolates state across factory invocations', () => { const factory = adaptiveTimeoutPolicy({ - default: 1000, - short: 100, - threshold: 1, + defaultMs: 1000, + tightenedMs: 100, + tightenAfterTimeouts: 1, }); const a = factory(); a.afterRequest({ From fd4b3f1010b71777a4de71851d78ace75526a97b Mon Sep 17 00:00:00 2001 From: David de Boer Date: Thu, 28 May 2026 14:44:03 +0200 Subject: [PATCH 4/6] refactor(pipeline): rename TimeoutPolicy option to `timeout` - PipelineOptions.timeout (factory) - ExecuteOptions.timeout / SelectOptions.timeout / RunOptions.timeout - SparqlConstructExecutorOptions.timeout / SparqlItemSelectorOptions.timeout - Internal Pipeline.timeoutFactory field follows suit The option's purpose is configuring the timeout; the value's *type* is TimeoutPolicy. Naming the field after its purpose is shorter, less stuttery, and reads cleaner at call sites: timeout: adaptiveTimeoutPolicy({...}) Also expands the README's adaptive-timeouts section to define the healthy/tightened states upfront and tabulate outcome classification. --- .../src/sampleStages.ts | 2 +- packages/pipeline-void/src/stage.ts | 6 ++-- packages/pipeline/README.md | 27 ++++++++++---- packages/pipeline/src/pipeline.ts | 35 ++++++++----------- packages/pipeline/src/sparql/executor.ts | 13 ++++--- packages/pipeline/src/sparql/selector.ts | 9 +++-- packages/pipeline/src/sparql/timeoutPolicy.ts | 14 ++++---- packages/pipeline/src/stage.ts | 16 ++++----- packages/pipeline/test/pipeline.test.ts | 18 +++++----- .../pipeline/test/sparql/executor.test.ts | 16 ++++----- .../pipeline/test/sparql/selector.test.ts | 8 ++--- 11 files changed, 86 insertions(+), 78 deletions(-) diff --git a/packages/pipeline-shacl-sampler/src/sampleStages.ts b/packages/pipeline-shacl-sampler/src/sampleStages.ts index 1759e5b0..203a8ed4 100644 --- a/packages/pipeline-shacl-sampler/src/sampleStages.ts +++ b/packages/pipeline-shacl-sampler/src/sampleStages.ts @@ -138,7 +138,7 @@ export async function shaclSampleStages( ), executors: new SparqlConstructExecutor({ query: buildSampleQuery(shape), - timeoutPolicy: new ConstantTimeoutPolicy(timeout), + timeout: new ConstantTimeoutPolicy(timeout), }), batchSize, maxConcurrency, diff --git a/packages/pipeline-void/src/stage.ts b/packages/pipeline-void/src/stage.ts index 2d6a9985..cfe63279 100644 --- a/packages/pipeline-void/src/stage.ts +++ b/packages/pipeline-void/src/stage.ts @@ -70,7 +70,7 @@ async function createVoidStage( options?.executor?.(query) ?? new SparqlConstructExecutor({ query, - timeoutPolicy: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), + timeout: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), }); if (options?.perClass) { @@ -210,7 +210,7 @@ export function uriSpaces( new UriSpaceExecutor( new SparqlConstructExecutor({ query, - timeoutPolicy: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), + timeout: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), }), uriSpaceMap, ), @@ -231,7 +231,7 @@ export function detectVocabularies( new VocabularyExecutor( new SparqlConstructExecutor({ query, - timeoutPolicy: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), + timeout: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), }), options?.vocabularies ? [...defaultVocabularies, ...options.vocabularies] diff --git a/packages/pipeline/README.md b/packages/pipeline/README.md index 2b7de9bd..f765596c 100644 --- a/packages/pipeline/README.md +++ b/packages/pipeline/README.md @@ -210,24 +210,39 @@ This keeps SPARQL doing the heavy lifting while TypeScript handles the edge case #### Adaptive timeouts -By default, every SPARQL request uses the same 5-minute budget. When a pipeline runs against many third-party endpoints, that fixed budget can cost ~80 minutes on a single dataset whose endpoint times out repeatedly on heavy queries. Pass a `TimeoutPolicy` factory to `Pipeline` to fast-fail once an endpoint has shown a run of consecutive timeouts: +By default, every SPARQL request uses the same 5-minute budget. When a pipeline runs against many third-party endpoints, that fixed budget can cost ~80 minutes on a single dataset whose endpoint times out repeatedly on heavy queries — light stages on the same endpoint then sit behind the heavy ones that will never succeed. + +A `TimeoutPolicy` decides the budget for each SPARQL request and observes the outcome. Two are built in: + +- **`ConstantTimeoutPolicy(timeoutMs)`** – returns the same budget for every request. The implicit default when `PipelineOptions.timeout` is omitted (`constantTimeoutPolicy(300_000)`). +- **`AdaptiveTimeoutPolicy({ defaultMs, tightenedMs, tightenAfterTimeouts })`** – per-endpoint state machine. Each endpoint is either _healthy_ (use `defaultMs`) or _tightened_ (use `tightenedMs`). After `tightenAfterTimeouts` consecutive `timeout` outcomes the endpoint flips to _tightened_; a single `ok` flips it back to _healthy_. + +`PipelineOptions.timeout` accepts a `() => TimeoutPolicy` factory. The pipeline invokes it once per dataset, so policy state resets between datasets and one bad dataset can’t poison the next: ```typescript import { adaptiveTimeoutPolicy } from '@lde/pipeline'; new Pipeline({ // … - timeoutPolicy: adaptiveTimeoutPolicy({ + timeout: adaptiveTimeoutPolicy({ defaultMs: 300_000, // 5 min while the endpoint is healthy - tightenedMs: 10_000, // 10 s once the endpoint is flipped to tightened - tightenAfterTimeouts: 2, // flip after 2 consecutive timeouts + tightenedMs: 10_000, // 10 s once the endpoint is tightened + tightenAfterTimeouts: 2, // flip to tightened after 2 consecutive timeouts }), }); ``` -The factory is invoked once per dataset, so policy state resets between datasets. HTTP 504 from the upstream and client-side `AbortSignal` timeouts both count as `timeout` outcomes; a single successful request relaxes the endpoint back to the default budget. Subscribe-capable policies forward transitions to the `ProgressReporter` via `timeoutTightened` / `timeoutRelaxed` — `ConsoleReporter` prints them as `↘ Tightened` / `↗ Relaxed` lines so operators can distinguish a fast-failed stage from an unexpected speedup. +Outcomes are classified as: + +| outcome | source | +| --------- | ------------------------------------------------------------------------ | +| `ok` | the request resolved | +| `timeout` | client-side `AbortSignal.timeout()` fired, or upstream returned HTTP 504 | +| `error` | anything else (other HTTP errors, parse errors, …) – neutral | + +Transitions are forwarded to the `ProgressReporter` via `timeoutTightened` / `timeoutRelaxed`; `ConsoleReporter` prints them as `↘ Tightened` / `↗ Relaxed` lines so operators can tell a fast-failed stage from an unexpected speedup. -Omit `timeoutPolicy` to keep today’s behaviour (`constantTimeoutPolicy(300_000)`). +Implement `TimeoutPolicy` directly for custom strategies (closing over shared state in the factory if you want it to span datasets). ### Validation diff --git a/packages/pipeline/src/pipeline.ts b/packages/pipeline/src/pipeline.ts index cd04142a..43906810 100644 --- a/packages/pipeline/src/pipeline.ts +++ b/packages/pipeline/src/pipeline.ts @@ -58,7 +58,7 @@ export interface PipelineOptions { * {@link TimeoutPolicy} instance, and the Pipeline invokes the factory * once per dataset so state resets between datasets. */ - timeoutPolicy?: () => TimeoutPolicy; + timeout?: () => TimeoutPolicy; } /** @@ -147,7 +147,7 @@ export class Pipeline { private readonly distributionResolver: DistributionResolver; private readonly chaining?: PipelineOptions['chaining']; private readonly reporter?: ProgressReporter; - private readonly timeoutPolicyFactory: () => TimeoutPolicy; + private readonly timeoutFactory: () => TimeoutPolicy; constructor(options: PipelineOptions) { const hasSubStages = options.stages.some( @@ -179,8 +179,8 @@ export class Pipeline { options.distributionResolver ?? new SparqlDistributionResolver(); this.chaining = options.chaining; this.reporter = options.reporter; - this.timeoutPolicyFactory = - options.timeoutPolicy ?? (() => new ConstantTimeoutPolicy(300_000)); + this.timeoutFactory = + options.timeout ?? (() => new ConstantTimeoutPolicy(300_000)); } async run(): Promise { @@ -206,8 +206,8 @@ export class Pipeline { private async processDataset(dataset: Dataset): Promise { this.reporter?.datasetStart?.(dataset); - const timeoutPolicy = this.timeoutPolicyFactory(); - const unsubscribe = timeoutPolicy.subscribe?.({ + const timeout: TimeoutPolicy = this.timeoutFactory(); + const unsubscribe = timeout.subscribe?.({ onTighten: (event) => this.reporter?.timeoutTightened?.(event), onRelax: (event) => this.reporter?.timeoutRelaxed?.(event), }); @@ -252,19 +252,14 @@ export class Pipeline { for (const stage of this.stages) { try { if (stage.stages.length > 0) { - await this.runChain( - dataset, - resolved.distribution, - stage, - timeoutPolicy, - ); + await this.runChain(dataset, resolved.distribution, stage, timeout); } else { await this.runStage( dataset, resolved.distribution, stage, this.writer, - timeoutPolicy, + timeout, ); } } catch (error) { @@ -315,7 +310,7 @@ export class Pipeline { distribution: Distribution, stage: Stage, writer: Writer = this.writer, - timeoutPolicy?: TimeoutPolicy, + timeout?: TimeoutPolicy, ): Promise { this.reporter?.stageStart?.(stage.name); const stageStart = Date.now(); @@ -335,7 +330,7 @@ export class Pipeline { heapUsedBytes: stageMemory.heapUsed, }); }, - timeoutPolicy, + timeout, }); if (result instanceof NotSupported) { @@ -358,14 +353,14 @@ export class Pipeline { distribution: Distribution, stage: Stage, writer: Writer, - timeoutPolicy?: TimeoutPolicy, + timeout?: TimeoutPolicy, ): Promise { const supported = await this.runStage( dataset, distribution, stage, writer, - timeoutPolicy, + timeout, ); if (!supported) { throw new Error( @@ -378,7 +373,7 @@ export class Pipeline { dataset: Dataset, distribution: Distribution, stage: Stage, - timeoutPolicy?: TimeoutPolicy, + timeout?: TimeoutPolicy, ): Promise { const { stageOutputResolver, outputDir } = this.chaining!; const outputFiles: string[] = []; @@ -395,7 +390,7 @@ export class Pipeline { distribution, stage, parentWriter, - timeoutPolicy, + timeout, ); outputFiles.push(parentWriter.getOutputPath(dataset)); @@ -415,7 +410,7 @@ export class Pipeline { currentDistribution, child, childWriter, - timeoutPolicy, + timeout, ); outputFiles.push(childWriter.getOutputPath(dataset)); diff --git a/packages/pipeline/src/sparql/executor.ts b/packages/pipeline/src/sparql/executor.ts index c535307f..946e0572 100644 --- a/packages/pipeline/src/sparql/executor.ts +++ b/packages/pipeline/src/sparql/executor.ts @@ -45,7 +45,7 @@ export interface ExecuteOptions { * Overrides the executor-level policy passed at construction time. * Pipeline runners use this to thread the per-dataset policy through. */ - timeoutPolicy?: TimeoutPolicy; + timeout?: TimeoutPolicy; } export interface Executor { @@ -70,13 +70,13 @@ export interface SparqlConstructExecutorOptions { * `new ConstantTimeoutPolicy(300_000)` so callers that supply nothing get * the same 5-minute budget as before — but expressed through the new * {@link TimeoutPolicy} surface so an adaptive policy can be slotted in - * via {@link ExecuteOptions.timeoutPolicy} without changing this default. + * via {@link ExecuteOptions.timeout} without changing this default. * * Replaces the old `timeout: number` option. Call sites passing * `timeout: 5000` migrate to - * `timeoutPolicy: constantTimeoutPolicy(5_000)()`. + * `timeout: constantTimeoutPolicy(5_000)()`. */ - timeoutPolicy?: TimeoutPolicy; + timeout?: TimeoutPolicy; /** * Number of retries for transient errors (network failures and HTTP 502/503/504). @@ -180,8 +180,7 @@ export class SparqlConstructExecutor implements Executor { } this.userFetcher = options.fetcher; - this.defaultPolicy = - options.timeoutPolicy ?? new ConstantTimeoutPolicy(300_000); + this.defaultPolicy = options.timeout ?? new ConstantTimeoutPolicy(300_000); } /** @@ -227,7 +226,7 @@ export class SparqlConstructExecutor implements Executor { assertSafeIri(dataset.iri.toString()); query = query.replaceAll('?dataset', `<${dataset.iri}>`); - const policy = options?.timeoutPolicy ?? this.defaultPolicy; + const policy = options?.timeout ?? this.defaultPolicy; const endpointUrl = endpoint; const quads = await pRetry( diff --git a/packages/pipeline/src/sparql/selector.ts b/packages/pipeline/src/sparql/selector.ts index fdc7bedf..631a3065 100644 --- a/packages/pipeline/src/sparql/selector.ts +++ b/packages/pipeline/src/sparql/selector.ts @@ -48,10 +48,10 @@ export interface SparqlItemSelectorOptions { * `new ConstantTimeoutPolicy(300_000)` so callers that supply nothing * keep today’s 5-minute budget. * - * Overridden by {@link SelectOptions.timeoutPolicy} when the Pipeline + * Overridden by {@link SelectOptions.timeout} when the Pipeline * threads a per-dataset policy through. */ - timeoutPolicy?: TimeoutPolicy; + timeout?: TimeoutPolicy; } /** @@ -96,8 +96,7 @@ export class SparqlItemSelector implements ItemSelector { this.queryLimit = this.parsed.solutionModifiers.limitOffset?.limit; this.maxResults = options.maxResults; this.userFetcher = options.fetcher; - this.defaultPolicy = - options.timeoutPolicy ?? new ConstantTimeoutPolicy(300_000); + this.defaultPolicy = options.timeout ?? new ConstantTimeoutPolicy(300_000); } async *select( @@ -108,7 +107,7 @@ export class SparqlItemSelector implements ItemSelector { if (this.maxResults === 0) return; const basePageSize = this.queryLimit ?? batchSize ?? 10; const endpoint = distribution.accessUrl!; - const policy = options?.timeoutPolicy ?? this.defaultPolicy; + const policy = options?.timeout ?? this.defaultPolicy; let offset = 0; let totalYielded = 0; diff --git a/packages/pipeline/src/sparql/timeoutPolicy.ts b/packages/pipeline/src/sparql/timeoutPolicy.ts index 1dc9686d..a95a758b 100644 --- a/packages/pipeline/src/sparql/timeoutPolicy.ts +++ b/packages/pipeline/src/sparql/timeoutPolicy.ts @@ -81,9 +81,9 @@ export interface TimeoutTransitionEvent { * its budget for an endpoint. Both hooks are optional. */ export interface TimeoutPolicyObserver { - /** Called when the policy starts using the short budget for an endpoint. */ + /** Called when the policy flips an endpoint to the tightened budget. */ onTighten?(event: TimeoutTransitionEvent): void; - /** Called when the policy returns to the default budget for an endpoint. */ + /** Called when the policy relaxes an endpoint back to the default budget. */ onRelax?(event: TimeoutTransitionEvent): void; } @@ -133,9 +133,9 @@ interface EndpointState { } /** - * Adaptive per-endpoint policy: after a configurable threshold of - * consecutive timeouts on the same endpoint, subsequent requests use a - * shorter budget so the pipeline fast-fails instead of waiting out the + * Adaptive per-endpoint policy: after {@link AdaptiveTimeoutPolicyOptions.tightenAfterTimeouts} + * consecutive timeouts on the same endpoint, subsequent requests use the + * tightened budget so the pipeline fast-fails instead of waiting out the * full default budget. A single successful request relaxes the endpoint * back to the default budget. * @@ -253,7 +253,7 @@ export class AdaptiveTimeoutPolicy implements TimeoutPolicy { /** * Factory returning a fresh {@link ConstantTimeoutPolicy} on every call. - * Pass this to {@link PipelineOptions.timeoutPolicy}. + * Pass this to {@link PipelineOptions.timeout}. */ export function constantTimeoutPolicy( timeoutMs: number, @@ -267,7 +267,7 @@ export function constantTimeoutPolicy( /** * Factory returning a fresh {@link AdaptiveTimeoutPolicy} on every call. - * Pass this to {@link PipelineOptions.timeoutPolicy}; the Pipeline invokes + * Pass this to {@link PipelineOptions.timeout}; the Pipeline invokes * the factory once per dataset so state resets between datasets. */ export function adaptiveTimeoutPolicy( diff --git a/packages/pipeline/src/stage.ts b/packages/pipeline/src/stage.ts index 131434d1..fc616337 100644 --- a/packages/pipeline/src/stage.ts +++ b/packages/pipeline/src/stage.ts @@ -55,13 +55,13 @@ export interface RunOptions { * dataset), so a single policy instance covers all stages and child * stages within one dataset. */ - timeoutPolicy?: TimeoutPolicy; + timeout?: TimeoutPolicy; } /** Options accepted by {@link ItemSelector.select}. */ export interface SelectOptions { /** Per-call timeout policy. */ - timeoutPolicy?: TimeoutPolicy; + timeout?: TimeoutPolicy; } export class Stage { @@ -96,11 +96,11 @@ export class Stage { writer: Writer, options?: RunOptions, ): Promise { - const timeoutPolicy = options?.timeoutPolicy; + const timeout = options?.timeout; if (this.itemSelector) { return this.runWithSelector( this.itemSelector.select(distribution, this.batchSize, { - timeoutPolicy, + timeout, }), dataset, distribution, @@ -109,7 +109,7 @@ export class Stage { ); } - const streams = await this.executeAll(dataset, distribution, timeoutPolicy); + const streams = await this.executeAll(dataset, distribution, timeout); if (streams instanceof NotSupported) { return streams; } @@ -225,7 +225,7 @@ export class Stage { this.executors.map(async (executor) => { const result = await executor.execute(dataset, distribution, { bindings, - timeoutPolicy: options?.timeoutPolicy, + timeout: options?.timeout, }); if (result instanceof NotSupported) return []; hasResults = true; @@ -328,11 +328,11 @@ export class Stage { private async executeAll( dataset: Dataset, distribution: Distribution, - timeoutPolicy: TimeoutPolicy | undefined, + timeout: TimeoutPolicy | undefined, ): Promise[] | NotSupported> { const results = await Promise.all( this.executors.map((executor) => - executor.execute(dataset, distribution, { timeoutPolicy }), + executor.execute(dataset, distribution, { timeout }), ), ); diff --git a/packages/pipeline/test/pipeline.test.ts b/packages/pipeline/test/pipeline.test.ts index 34324fa5..62c317e5 100644 --- a/packages/pipeline/test/pipeline.test.ts +++ b/packages/pipeline/test/pipeline.test.ts @@ -1143,7 +1143,7 @@ describe('Pipeline', () => { }); }); - describe('timeoutPolicy', () => { + describe('timeout', () => { it('passes a TimeoutPolicy instance to each stage.run', async () => { const stage = makeStage('stage1'); @@ -1158,9 +1158,9 @@ describe('Pipeline', () => { const call = (stage.run as ReturnType).mock.calls[0]; const options = call[3]; - expect(options.timeoutPolicy).toBeDefined(); - expect(typeof options.timeoutPolicy.beforeRequest).toBe('function'); - expect(typeof options.timeoutPolicy.afterRequest).toBe('function'); + expect(options.timeout).toBeDefined(); + expect(typeof options.timeout.beforeRequest).toBe('function'); + expect(typeof options.timeout.afterRequest).toBe('function'); }); it('invokes the factory once per dataset', async () => { @@ -1177,7 +1177,7 @@ describe('Pipeline', () => { stages: [makeStage('stage1')], writers: writer, distributionResolver: makeResolver(makeResolvedDistribution()), - timeoutPolicy: factory, + timeout: factory, }); await pipeline.run(); @@ -1205,14 +1205,14 @@ describe('Pipeline', () => { stages: [stage], writers: writer, distributionResolver: makeResolver(makeResolvedDistribution()), - timeoutPolicy: factory, + timeout: factory, }); await pipeline.run(); const runCalls = (stage.run as ReturnType).mock.calls; - expect(runCalls[0][3].timeoutPolicy).toBe(policies[0]); - expect(runCalls[1][3].timeoutPolicy).toBe(policies[1]); + expect(runCalls[0][3].timeout).toBe(policies[0]); + expect(runCalls[1][3].timeout).toBe(policies[1]); expect(policies[0]).not.toBe(policies[1]); }); @@ -1250,7 +1250,7 @@ describe('Pipeline', () => { stages: [makeStage('stage1')], writers: writer, distributionResolver: makeResolver(makeResolvedDistribution()), - timeoutPolicy: factory, + timeout: factory, reporter, }); diff --git a/packages/pipeline/test/sparql/executor.test.ts b/packages/pipeline/test/sparql/executor.test.ts index da53edc8..22488273 100644 --- a/packages/pipeline/test/sparql/executor.test.ts +++ b/packages/pipeline/test/sparql/executor.test.ts @@ -661,7 +661,7 @@ describe('SparqlConstructExecutor', () => { const distribution = makeDistribution(); await executor.execute(makeDataset(distribution), distribution, { - timeoutPolicy: policy, + timeout: policy, }); expect(policy.beforeRequest).toHaveBeenCalledTimes(1); @@ -695,7 +695,7 @@ describe('SparqlConstructExecutor', () => { await expect( executor.execute(makeDataset(distribution), distribution, { - timeoutPolicy: policy, + timeout: policy, }), ).rejects.toThrow('504'); @@ -720,7 +720,7 @@ describe('SparqlConstructExecutor', () => { await expect( executor.execute(makeDataset(distribution), distribution, { - timeoutPolicy: policy, + timeout: policy, }), ).rejects.toThrow(); @@ -745,7 +745,7 @@ describe('SparqlConstructExecutor', () => { await expect( executor.execute(makeDataset(distribution), distribution, { - timeoutPolicy: policy, + timeout: policy, }), ).rejects.toThrow(); @@ -772,7 +772,7 @@ describe('SparqlConstructExecutor', () => { await expect( executor.execute(makeDataset(distribution), distribution, { - timeoutPolicy: policy, + timeout: policy, }), ).rejects.toThrow(); @@ -799,7 +799,7 @@ describe('SparqlConstructExecutor', () => { const distribution = makeDistribution(); await executor.execute(makeDataset(distribution), distribution, { - timeoutPolicy: policy, + timeout: policy, }); expect(policy.beforeRequest).toHaveBeenCalledTimes(2); @@ -822,7 +822,7 @@ describe('SparqlConstructExecutor', () => { const executor = new SparqlConstructExecutor({ query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', fetcher, - timeoutPolicy: policy, + timeout: policy, }); const distribution = makeDistribution(); @@ -864,7 +864,7 @@ describe('SparqlConstructExecutor', () => { await expect( executor.execute(makeDataset(distribution), distribution, { - timeoutPolicy: policy, + timeout: policy, }), ).rejects.toThrow(); diff --git a/packages/pipeline/test/sparql/selector.test.ts b/packages/pipeline/test/sparql/selector.test.ts index 558fa0af..340b136b 100644 --- a/packages/pipeline/test/sparql/selector.test.ts +++ b/packages/pipeline/test/sparql/selector.test.ts @@ -533,7 +533,7 @@ describe('SparqlItemSelector', () => { const rows: VariableBindings[] = []; for await (const row of selector.select(distribution, 2, { - timeoutPolicy: policy, + timeout: policy, })) { rows.push(row); } @@ -568,7 +568,7 @@ describe('SparqlItemSelector', () => { const iterate = async () => { for await (const _row of selector.select(distribution, 10, { - timeoutPolicy: policy, + timeout: policy, })) { // consume } @@ -599,7 +599,7 @@ describe('SparqlItemSelector', () => { const iterate = async () => { for await (const _row of selector.select(distribution, 10, { - timeoutPolicy: policy, + timeout: policy, })) { // consume } @@ -620,7 +620,7 @@ describe('SparqlItemSelector', () => { const selector = new SparqlItemSelector({ query, fetcher: mockFetcher as never, - timeoutPolicy: policy, + timeout: policy, }); for await (const _row of selector.select(distribution, 10)) { From 785ccbf8b314805affd031f73907e78768ffde2c Mon Sep 17 00:00:00 2001 From: David de Boer Date: Thu, 28 May 2026 15:05:10 +0200 Subject: [PATCH 5/6] refactor(pipeline): remove executor/selector-level `timeout` option Timeouts now live exclusively at the Pipeline level. Drop: - SparqlConstructExecutorOptions.timeout - SparqlItemSelectorOptions.timeout - VoidStageOptions.timeout (would have been silently overridden) - ShaclSampleStagesOptions.timeout (same) The fallback budget when no PipelineOptions.timeout is supplied is now a module-level ConstantTimeoutPolicy(300_000). The old executor/selector options were redundant: at runtime, Stage always forwarded the per- dataset policy from Pipeline, which silently replaced any executor-author ceiling. Removing the option fixes that footgun and pushes operators toward the right altitude for timeout configuration. --- .../src/sampleStages.ts | 8 ----- .../pipeline-shacl-sampler/vite.config.ts | 6 ++-- packages/pipeline-void/src/stage.ts | 31 ++++++------------- packages/pipeline-void/vite.config.ts | 2 +- packages/pipeline/README.md | 2 ++ packages/pipeline/src/sparql/executor.ts | 27 ++++++---------- packages/pipeline/src/sparql/selector.ts | 20 +++++------- .../pipeline/test/sparql/executor.test.ts | 13 ++++---- .../pipeline/test/sparql/selector.test.ts | 16 +++++----- packages/pipeline/vite.config.ts | 4 +-- 10 files changed, 49 insertions(+), 80 deletions(-) diff --git a/packages/pipeline-shacl-sampler/src/sampleStages.ts b/packages/pipeline-shacl-sampler/src/sampleStages.ts index 203a8ed4..e79b6d24 100644 --- a/packages/pipeline-shacl-sampler/src/sampleStages.ts +++ b/packages/pipeline-shacl-sampler/src/sampleStages.ts @@ -1,5 +1,4 @@ import { - ConstantTimeoutPolicy, Stage, SparqlConstructExecutor, SparqlItemSelector, @@ -51,11 +50,6 @@ export interface ShaclSampleStagesOptions { * @default 50 */ samplesPerClass?: number; - /** - * SPARQL query timeout in milliseconds. - * @default 60000 - */ - timeout?: number; /** * Maximum number of sampled subjects per executor call. Defaults to * {@link samplesPerClass} so the whole sample fits in one CONSTRUCT @@ -112,7 +106,6 @@ export async function shaclSampleStages( options: ShaclSampleStagesOptions, ): Promise { const samplesPerClass = options.samplesPerClass ?? 50; - const timeout = options.timeout ?? 60_000; const batchSize = options.batchSize ?? samplesPerClass; const maxConcurrency = options.maxConcurrency; const namespaceAliases = options.namespaceAliases ?? []; @@ -138,7 +131,6 @@ export async function shaclSampleStages( ), executors: new SparqlConstructExecutor({ query: buildSampleQuery(shape), - timeout: new ConstantTimeoutPolicy(timeout), }), batchSize, maxConcurrency, diff --git a/packages/pipeline-shacl-sampler/vite.config.ts b/packages/pipeline-shacl-sampler/vite.config.ts index 4a494bf9..c42c1ae9 100644 --- a/packages/pipeline-shacl-sampler/vite.config.ts +++ b/packages/pipeline-shacl-sampler/vite.config.ts @@ -11,9 +11,9 @@ export default mergeConfig( thresholds: { autoUpdate: true, functions: 96.77, - lines: 97.38, - branches: 89.87, - statements: 95.18, + lines: 97.36, + branches: 89.61, + statements: 95.15, }, }, }, diff --git a/packages/pipeline-void/src/stage.ts b/packages/pipeline-void/src/stage.ts index cfe63279..096e3814 100644 --- a/packages/pipeline-void/src/stage.ts +++ b/packages/pipeline-void/src/stage.ts @@ -1,5 +1,4 @@ import { - ConstantTimeoutPolicy, Stage, SparqlConstructExecutor, SparqlItemSelector, @@ -25,11 +24,14 @@ const queriesDir = resolve( /** * Options for configuring VoID stage execution. + * + * Per-request timeouts are configured at the {@link Pipeline} level via + * `PipelineOptions.timeout`; VoID stages no longer expose their own timeout + * knob. Kept as a named type so per-class / per-stages option types can + * extend it as more knobs are added. */ -export interface VoidStageOptions { - /** SPARQL query timeout in milliseconds. @default 60000 */ - timeout?: number; -} +// eslint-disable-next-line @typescript-eslint/no-empty-interface, @typescript-eslint/no-empty-object-type +export interface VoidStageOptions {} /** * Options for per-class VoID stages that iterate over classes. @@ -67,11 +69,7 @@ async function createVoidStage( ): Promise { const query = await readQueryFile(resolve(queriesDir, filename)); const executor = - options?.executor?.(query) ?? - new SparqlConstructExecutor({ - query, - timeout: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), - }); + options?.executor?.(query) ?? new SparqlConstructExecutor({ query }); if (options?.perClass) { return new Stage({ @@ -207,13 +205,7 @@ export function uriSpaces( return createVoidStage('object-uri-space.rq', { ...options, executor: (query) => - new UriSpaceExecutor( - new SparqlConstructExecutor({ - query, - timeout: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), - }), - uriSpaceMap, - ), + new UriSpaceExecutor(new SparqlConstructExecutor({ query }), uriSpaceMap), }); } @@ -229,10 +221,7 @@ export function detectVocabularies( ...options, executor: (query) => new VocabularyExecutor( - new SparqlConstructExecutor({ - query, - timeout: new ConstantTimeoutPolicy(options?.timeout ?? 60_000), - }), + new SparqlConstructExecutor({ query }), options?.vocabularies ? [...defaultVocabularies, ...options.vocabularies] : undefined, diff --git a/packages/pipeline-void/vite.config.ts b/packages/pipeline-void/vite.config.ts index 29f6a1eb..e3a10ca6 100644 --- a/packages/pipeline-void/vite.config.ts +++ b/packages/pipeline-void/vite.config.ts @@ -12,7 +12,7 @@ export default mergeConfig( thresholds: { functions: 50, lines: 78.43, - branches: 63.26, + branches: 67.44, statements: 78.84, }, }, diff --git a/packages/pipeline/README.md b/packages/pipeline/README.md index f765596c..a606b5c3 100644 --- a/packages/pipeline/README.md +++ b/packages/pipeline/README.md @@ -244,6 +244,8 @@ Transitions are forwarded to the `ProgressReporter` via `timeoutTightened` / `ti Implement `TimeoutPolicy` directly for custom strategies (closing over shared state in the factory if you want it to span datasets). +Timeouts live at the pipeline level — neither `SparqlConstructExecutor` nor `SparqlItemSelector` accept their own `timeout` option. Per-endpoint state belongs in the adaptive policy, and per-stage budgets aren’t supported. Reusable stage facades (`@lde/pipeline-void`, `@lde/pipeline-shacl-sampler`) follow the same convention. + ### Validation Stages can optionally validate their output quads against a `Validator`. Validation operates on the **combined output of all executors per batch**, not on individual quads or per-executor output. A batch produces a complete result set — a self-contained cluster of linked resources — that can be meaningfully matched against SHACL shapes. Even with a single executor, each batch is a complete unit; with multiple executors, shapes that reference triples from different executors are validated correctly. diff --git a/packages/pipeline/src/sparql/executor.ts b/packages/pipeline/src/sparql/executor.ts index 946e0572..323f3872 100644 --- a/packages/pipeline/src/sparql/executor.ts +++ b/packages/pipeline/src/sparql/executor.ts @@ -19,6 +19,13 @@ import { type TimeoutPolicy, } from './timeoutPolicy.js'; +/** + * Fallback policy when no per-call `TimeoutPolicy` is supplied via + * {@link ExecuteOptions.timeout}. Pipeline always supplies one, so this only + * matters when the executor is driven directly (without a Pipeline). + */ +const defaultTimeoutPolicy: TimeoutPolicy = new ConstantTimeoutPolicy(300_000); + /** * An executor could not run because the dataset lacks a supported distribution. */ @@ -65,19 +72,6 @@ export interface SparqlConstructExecutorOptions { */ query: string; - /** - * Per-attempt timeout policy. Defaults to - * `new ConstantTimeoutPolicy(300_000)` so callers that supply nothing get - * the same 5-minute budget as before — but expressed through the new - * {@link TimeoutPolicy} surface so an adaptive policy can be slotted in - * via {@link ExecuteOptions.timeout} without changing this default. - * - * Replaces the old `timeout: number` option. Call sites passing - * `timeout: 5000` migrate to - * `timeout: constantTimeoutPolicy(5_000)()`. - */ - timeout?: TimeoutPolicy; - /** * Number of retries for transient errors (network failures and HTTP 502/503/504). * @default 3 @@ -159,7 +153,6 @@ export class SparqlConstructExecutor implements Executor { private readonly rawQuery: string; private readonly preParsed?: QueryConstruct; private readonly userFetcher?: SparqlEndpointFetcher; - private readonly defaultPolicy: TimeoutPolicy; private readonly retries: number; private readonly lineBuffer: boolean; private readonly deduplicate: boolean; @@ -180,7 +173,6 @@ export class SparqlConstructExecutor implements Executor { } this.userFetcher = options.fetcher; - this.defaultPolicy = options.timeout ?? new ConstantTimeoutPolicy(300_000); } /** @@ -226,11 +218,10 @@ export class SparqlConstructExecutor implements Executor { assertSafeIri(dataset.iri.toString()); query = query.replaceAll('?dataset', `<${dataset.iri}>`); - const policy = options?.timeout ?? this.defaultPolicy; - const endpointUrl = endpoint; + const policy = options?.timeout ?? defaultTimeoutPolicy; const quads = await pRetry( - () => this.fetchQuadsWithPolicy(endpointUrl, query, policy), + () => this.fetchQuadsWithPolicy(endpoint, query, policy), { retries: this.retries, shouldRetry: ({ error }) => isTransientError(error), diff --git a/packages/pipeline/src/sparql/selector.ts b/packages/pipeline/src/sparql/selector.ts index 631a3065..38d6fe4c 100644 --- a/packages/pipeline/src/sparql/selector.ts +++ b/packages/pipeline/src/sparql/selector.ts @@ -18,6 +18,13 @@ import { const transientStatusPattern = /HTTP status (\d+)/; +/** + * Fallback policy when no per-call `TimeoutPolicy` is supplied via + * {@link SelectOptions.timeout}. Pipeline always supplies one, so this only + * matters when the selector is driven directly (without a Pipeline). + */ +const defaultTimeoutPolicy: TimeoutPolicy = new ConstantTimeoutPolicy(300_000); + const parser = new Parser(); const generator = new Generator(); const F = new AstFactory(); @@ -43,15 +50,6 @@ export interface SparqlItemSelectorOptions { maxResults?: number; /** Custom fetcher instance. */ fetcher?: SparqlEndpointFetcher; - /** - * Per-attempt timeout policy. Defaults to - * `new ConstantTimeoutPolicy(300_000)` so callers that supply nothing - * keep today’s 5-minute budget. - * - * Overridden by {@link SelectOptions.timeout} when the Pipeline - * threads a per-dataset policy through. - */ - timeout?: TimeoutPolicy; } /** @@ -77,7 +75,6 @@ export class SparqlItemSelector implements ItemSelector { private readonly queryLimit?: number; private readonly maxResults?: number; private readonly userFetcher?: SparqlEndpointFetcher; - private readonly defaultPolicy: TimeoutPolicy; constructor(options: SparqlItemSelectorOptions) { const parsed = parser.parse(options.query); @@ -96,7 +93,6 @@ export class SparqlItemSelector implements ItemSelector { this.queryLimit = this.parsed.solutionModifiers.limitOffset?.limit; this.maxResults = options.maxResults; this.userFetcher = options.fetcher; - this.defaultPolicy = options.timeout ?? new ConstantTimeoutPolicy(300_000); } async *select( @@ -107,7 +103,7 @@ export class SparqlItemSelector implements ItemSelector { if (this.maxResults === 0) return; const basePageSize = this.queryLimit ?? batchSize ?? 10; const endpoint = distribution.accessUrl!; - const policy = options?.timeout ?? this.defaultPolicy; + const policy = options?.timeout ?? defaultTimeoutPolicy; let offset = 0; let totalYielded = 0; diff --git a/packages/pipeline/test/sparql/executor.test.ts b/packages/pipeline/test/sparql/executor.test.ts index 22488273..bf1b7c86 100644 --- a/packages/pipeline/test/sparql/executor.test.ts +++ b/packages/pipeline/test/sparql/executor.test.ts @@ -814,22 +814,21 @@ describe('SparqlConstructExecutor', () => { ); }); - it('falls back to the executor-level policy when ExecuteOptions omits one', async () => { + it('uses a default policy when ExecuteOptions omits one', async () => { const fetcher = new SparqlEndpointFetcher(); vi.spyOn(fetcher, 'fetchTriples').mockResolvedValue([] as never); - const policy = recordingPolicy(); const executor = new SparqlConstructExecutor({ query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }', fetcher, - timeout: policy, }); const distribution = makeDistribution(); - await executor.execute(makeDataset(distribution), distribution); - - expect(policy.beforeRequest).toHaveBeenCalledTimes(1); - expect(policy.afterRequest).toHaveBeenCalledTimes(1); + // No policy supplied at construction or per call — the executor falls + // back to its module-level default and the request still completes. + await expect( + executor.execute(makeDataset(distribution), distribution), + ).resolves.toBeDefined(); }); it('aborts the underlying fetch when the policy budget elapses', async () => { diff --git a/packages/pipeline/test/sparql/selector.test.ts b/packages/pipeline/test/sparql/selector.test.ts index 340b136b..53bb87eb 100644 --- a/packages/pipeline/test/sparql/selector.test.ts +++ b/packages/pipeline/test/sparql/selector.test.ts @@ -611,24 +611,24 @@ describe('SparqlItemSelector', () => { ); }); - it('falls back to the selector-level policy when select() omits one', async () => { + it('uses a default policy when select() omits one', async () => { const mockFetcher = { fetchBindings: vi.fn().mockImplementation(() => bindingsStream([])), }; - const policy = recordingPolicy(); const selector = new SparqlItemSelector({ query, fetcher: mockFetcher as never, - timeout: policy, }); - for await (const _row of selector.select(distribution, 10)) { - // consume + // No policy supplied at construction or per call — pagination still + // works against the module-level default policy. + const rows: VariableBindings[] = []; + for await (const row of selector.select(distribution, 10)) { + rows.push(row); } - - expect(policy.beforeRequest).toHaveBeenCalledTimes(1); - expect(policy.afterRequest).toHaveBeenCalledTimes(1); + expect(rows).toHaveLength(0); + expect(mockFetcher.fetchBindings).toHaveBeenCalledTimes(1); }); }); }); diff --git a/packages/pipeline/vite.config.ts b/packages/pipeline/vite.config.ts index afc94101..c2f2ec7f 100644 --- a/packages/pipeline/vite.config.ts +++ b/packages/pipeline/vite.config.ts @@ -12,8 +12,8 @@ export default mergeConfig( thresholds: { autoUpdate: true, functions: 94.3, - lines: 93.67, - branches: 89.07, + lines: 93.66, + branches: 88.97, statements: 93.19, }, }, From a5d352a9742cd83e17b686455174d33d06616558 Mon Sep 17 00:00:00 2001 From: David de Boer Date: Thu, 28 May 2026 15:09:37 +0200 Subject: [PATCH 6/6] fix(pipeline-void, pipeline-shacl-sampler): forward SelectOptions through adapter selectors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit classSelector and subjectSelector wrapped SparqlItemSelector but dropped the third `SelectOptions` argument that Stage now threads through. As a result, the Pipeline's per-dataset TimeoutPolicy never reached selector requests for VoID class partitioning or SHACL subject sampling, so adaptive tightening silently ignored those calls. Also clarifies the JSDoc on SparqlConstructExecutorOptions.fetcher: a user-supplied fetcher bypasses the policy budget — the policy hooks still fire for outcome reporting, but adaptive tightening cannot apply. This option is intended for tests; most callers should leave it unset. --- .../src/sampleStages.ts | 7 ++++-- packages/pipeline-void/src/stage.ts | 7 ++++-- packages/pipeline/src/sparql/executor.ts | 25 +++++++++++-------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/packages/pipeline-shacl-sampler/src/sampleStages.ts b/packages/pipeline-shacl-sampler/src/sampleStages.ts index e79b6d24..2ca161e0 100644 --- a/packages/pipeline-shacl-sampler/src/sampleStages.ts +++ b/packages/pipeline-shacl-sampler/src/sampleStages.ts @@ -146,7 +146,10 @@ function subjectSelector( ): ItemSelector { assertSafeIri(targetClass.value); return { - select(distribution, batchSize) { + // Forward `options` so the Pipeline’s per-dataset TimeoutPolicy + // reaches the inner SparqlItemSelector — without this the adaptive + // budget is silently bypassed for subject selection. + select(distribution, batchSize, options) { const query = buildSubjectSelectorQuery({ targetClass, subjectFilter: distribution.subjectFilter, @@ -156,7 +159,7 @@ function subjectSelector( return new SparqlItemSelector({ query, maxResults: limit, - }).select(distribution, batchSize); + }).select(distribution, batchSize, options); }, }; } diff --git a/packages/pipeline-void/src/stage.ts b/packages/pipeline-void/src/stage.ts index 096e3814..4a78a86e 100644 --- a/packages/pipeline-void/src/stage.ts +++ b/packages/pipeline-void/src/stage.ts @@ -88,7 +88,10 @@ async function createVoidStage( function classSelector(): ItemSelector { return { - select: (distribution, batchSize) => { + // Forward `options` so the Pipeline’s per-dataset TimeoutPolicy + // reaches the inner SparqlItemSelector — without this the adaptive + // budget is silently bypassed for class selection. + select: (distribution, batchSize, options) => { const subjectFilter = distribution.subjectFilter ?? ''; let fromClause = ''; if (distribution.namedGraph) { @@ -104,7 +107,7 @@ function classSelector(): ItemSelector { return new SparqlItemSelector({ query: selectorQuery, - }).select(distribution, batchSize); + }).select(distribution, batchSize, options); }, }; } diff --git a/packages/pipeline/src/sparql/executor.ts b/packages/pipeline/src/sparql/executor.ts index 323f3872..1690a23c 100644 --- a/packages/pipeline/src/sparql/executor.ts +++ b/packages/pipeline/src/sparql/executor.ts @@ -81,14 +81,19 @@ export interface SparqlConstructExecutorOptions { /** * Optional custom SparqlEndpointFetcher instance. * - * When supplied, the executor uses this fetcher for every attempt. The - * per-attempt timeout from the policy is still applied via an - * {@link AbortSignal} on the underlying fetch (provided the supplied - * fetcher honours `init.signal`). + * When supplied, the executor uses this fetcher as-is for every attempt + * — the per-attempt timeout from the {@link TimeoutPolicy} is **not** + * enforced (the supplied fetcher’s own `timeout` governs). Policy + * `beforeRequest`/`afterRequest` hooks still fire so outcome + * classification works, but adaptive tightening cannot apply. * * When omitted, the executor builds a fresh * {@link SparqlEndpointFetcher} per attempt with the per-attempt timeout * baked in. + * + * This option is intended for tests (mocking `fetchTriples`) and + * advanced cases that need full control of the fetcher. Most callers + * should leave it unset. */ fetcher?: SparqlEndpointFetcher; @@ -269,12 +274,12 @@ export class SparqlConstructExecutor implements Executor { } /** - * Pick the fetcher to use for a single attempt. When a user-supplied - * fetcher is configured, it is used as-is — the policy timeout is still - * applied if the user's underlying fetch honours `init.signal`, since - * {@link SparqlEndpointFetcher} merges its own timeout signal with any - * caller-supplied one only via construction. We therefore wrap the - * user's fetcher only when no custom fetcher was provided. + * Pick the fetcher to use for a single attempt. A user-supplied fetcher + * is used as-is and its own timeout governs the request; the per-attempt + * policy budget is bypassed in that case (see the JSDoc on + * {@link SparqlConstructExecutorOptions.fetcher}). Otherwise a fresh + * {@link SparqlEndpointFetcher} is constructed per attempt with the + * policy-supplied timeout baked in. */ private fetcherForAttempt(timeoutMs: number): SparqlEndpointFetcher { if (this.userFetcher) return this.userFetcher;