Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/pipeline-console-reporter/src/consoleReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Dataset, Distribution } from '@lde/dataset';
import type {
DistributionAnalysisResult,
ProgressReporter,
TimeoutTransitionEvent,
ValidationReport,
} from '@lde/pipeline';
import chalk from 'chalk';
Expand Down Expand Up @@ -283,4 +284,24 @@ export class ConsoleReporter implements ProgressReporter {
)} ${chalk.dim(`(memory: ${formatBytes(result.memoryUsageBytes)} RSS, ${formatBytes(result.heapUsedBytes)} heap)`)}\n`,
);
}

timeoutTightened(event: TimeoutTransitionEvent): void {
this.printLine(
chalk.yellow('↘'),
`Tightened timeout for ${event.endpoint.toString()} to ${chalk.bold(
prettyMilliseconds(event.toTimeoutMs),
)} after ${chalk.bold(event.consecutiveTimeouts)} consecutive timeouts`,
2,
);
}

timeoutRelaxed(event: TimeoutTransitionEvent): void {
this.printLine(
chalk.green('↗'),
`Relaxed timeout for ${event.endpoint.toString()} back to ${chalk.bold(
prettyMilliseconds(event.toTimeoutMs),
)} after successful request`,
2,
);
}
}
36 changes: 36 additions & 0 deletions packages/pipeline-console-reporter/test/consoleReporter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,40 @@ describe('ConsoleReporter', () => {
expect(output).toContain('to http://localhost:7001/sparql');
});
});

describe('timeout transitions', () => {
it('prints a tightened-timeout line', () => {
const reporter = new ConsoleReporter();
const spy = vi.spyOn(process.stderr, 'write').mockReturnValue(true);

reporter.timeoutTightened({
endpoint: new URL('https://data.razu.nl/sparql'),
fromTimeoutMs: 300_000,
toTimeoutMs: 10_000,
consecutiveTimeouts: 2,
});

const output = spy.mock.calls.map((c) => String(c[0])).join('');
expect(output).toContain('Tightened');
expect(output).toContain('https://data.razu.nl/sparql');
expect(output).toContain('10s');
expect(output).toContain('2');
});

it('prints a relaxed-timeout line', () => {
const reporter = new ConsoleReporter();
const spy = vi.spyOn(process.stderr, 'write').mockReturnValue(true);

reporter.timeoutRelaxed({
endpoint: new URL('https://data.razu.nl/sparql'),
fromTimeoutMs: 10_000,
toTimeoutMs: 300_000,
consecutiveTimeouts: 0,
});

const output = spy.mock.calls.map((c) => String(c[0])).join('');
expect(output).toContain('Relaxed');
expect(output).toContain('https://data.razu.nl/sparql');
});
});
});
6 changes: 3 additions & 3 deletions packages/pipeline-console-reporter/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ export default mergeConfig(
coverage: {
thresholds: {
autoUpdate: true,
functions: 63.63,
lines: 63.36,
functions: 66.66,
lines: 64.07,
branches: 42.59,
statements: 63.72,
statements: 64.42,
},
},
},
Expand Down
14 changes: 5 additions & 9 deletions packages/pipeline-shacl-sampler/src/sampleStages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ export interface ShaclSampleStagesOptions {
* @default 50
*/
samplesPerClass?: number;
/**
* SPARQL query timeout in milliseconds.
* @default 60000
*/
timeout?: number;
/**
* Maximum number of sampled subjects per executor call. Defaults to
* {@link samplesPerClass} so the whole sample fits in one CONSTRUCT
Expand Down Expand Up @@ -111,7 +106,6 @@ export async function shaclSampleStages(
options: ShaclSampleStagesOptions,
): Promise<Stage[]> {
const samplesPerClass = options.samplesPerClass ?? 50;
const timeout = options.timeout ?? 60_000;
const batchSize = options.batchSize ?? samplesPerClass;
const maxConcurrency = options.maxConcurrency;
const namespaceAliases = options.namespaceAliases ?? [];
Expand All @@ -137,7 +131,6 @@ export async function shaclSampleStages(
),
executors: new SparqlConstructExecutor({
query: buildSampleQuery(shape),
timeout,
}),
batchSize,
maxConcurrency,
Expand All @@ -153,7 +146,10 @@ function subjectSelector(
): ItemSelector {
assertSafeIri(targetClass.value);
return {
select(distribution, batchSize) {
// Forward `options` so the Pipeline’s per-dataset TimeoutPolicy
// reaches the inner SparqlItemSelector — without this the adaptive
// budget is silently bypassed for subject selection.
select(distribution, batchSize, options) {
const query = buildSubjectSelectorQuery({
targetClass,
subjectFilter: distribution.subjectFilter,
Expand All @@ -163,7 +159,7 @@ function subjectSelector(
return new SparqlItemSelector({
query,
maxResults: limit,
}).select(distribution, batchSize);
}).select(distribution, batchSize, options);
},
};
}
Expand Down
6 changes: 3 additions & 3 deletions packages/pipeline-shacl-sampler/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ export default mergeConfig(
thresholds: {
autoUpdate: true,
functions: 96.77,
lines: 97.38,
branches: 89.87,
statements: 95.18,
lines: 97.36,
branches: 89.61,
statements: 95.15,
},
},
},
Expand Down
37 changes: 15 additions & 22 deletions packages/pipeline-void/src/stage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ const queriesDir = resolve(

/**
* Options for configuring VoID stage execution.
*
* Per-request timeouts are configured at the {@link Pipeline} level via
* `PipelineOptions.timeout`; VoID stages no longer expose their own timeout
* knob. Kept as a named type so per-class / per-stages option types can
* extend it as more knobs are added.
*/
export interface VoidStageOptions {
/** SPARQL query timeout in milliseconds. @default 60000 */
timeout?: number;
}
// eslint-disable-next-line @typescript-eslint/no-empty-interface, @typescript-eslint/no-empty-object-type
export interface VoidStageOptions {}

/**
* Options for per-class VoID stages that iterate over classes.
Expand Down Expand Up @@ -66,11 +69,7 @@ async function createVoidStage(
): Promise<Stage> {
const query = await readQueryFile(resolve(queriesDir, filename));
const executor =
options?.executor?.(query) ??
new SparqlConstructExecutor({
query,
timeout: options?.timeout ?? 60_000,
});
options?.executor?.(query) ?? new SparqlConstructExecutor({ query });

if (options?.perClass) {
return new Stage({
Expand All @@ -89,7 +88,10 @@ async function createVoidStage(

function classSelector(): ItemSelector {
return {
select: (distribution, batchSize) => {
// Forward `options` so the Pipeline’s per-dataset TimeoutPolicy
// reaches the inner SparqlItemSelector — without this the adaptive
// budget is silently bypassed for class selection.
select: (distribution, batchSize, options) => {
const subjectFilter = distribution.subjectFilter ?? '';
let fromClause = '';
if (distribution.namedGraph) {
Expand All @@ -105,7 +107,7 @@ function classSelector(): ItemSelector {

return new SparqlItemSelector({
query: selectorQuery,
}).select(distribution, batchSize);
}).select(distribution, batchSize, options);
},
};
}
Expand Down Expand Up @@ -206,13 +208,7 @@ export function uriSpaces(
return createVoidStage('object-uri-space.rq', {
...options,
executor: (query) =>
new UriSpaceExecutor(
new SparqlConstructExecutor({
query,
timeout: options?.timeout ?? 60_000,
}),
uriSpaceMap,
),
new UriSpaceExecutor(new SparqlConstructExecutor({ query }), uriSpaceMap),
});
}

Expand All @@ -228,10 +224,7 @@ export function detectVocabularies(
...options,
executor: (query) =>
new VocabularyExecutor(
new SparqlConstructExecutor({
query,
timeout: options?.timeout ?? 60_000,
}),
new SparqlConstructExecutor({ query }),
options?.vocabularies
? [...defaultVocabularies, ...options.vocabularies]
: undefined,
Expand Down
2 changes: 1 addition & 1 deletion packages/pipeline-void/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export default mergeConfig(
thresholds: {
functions: 50,
lines: 78.43,
branches: 63.26,
branches: 67.44,
statements: 78.84,
},
},
Expand Down
38 changes: 38 additions & 0 deletions packages/pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,44 @@ new Stage({

This keeps SPARQL doing the heavy lifting while TypeScript handles the edge cases. See [@lde/pipeline-void](../pipeline-void)'s `VocabularyExecutor` for a real-world example of this pattern.

#### Adaptive timeouts

By default, every SPARQL request uses the same 5-minute budget. When a pipeline runs against many third-party endpoints, that fixed budget can cost ~80 minutes on a single dataset whose endpoint times out repeatedly on heavy queries — light stages on the same endpoint then sit behind the heavy ones that will never succeed.

A `TimeoutPolicy` decides the budget for each SPARQL request and observes the outcome. Two are built in:

- **`ConstantTimeoutPolicy(timeoutMs)`** – returns the same budget for every request. The implicit default when `PipelineOptions.timeout` is omitted (`constantTimeoutPolicy(300_000)`).
- **`AdaptiveTimeoutPolicy({ defaultMs, tightenedMs, tightenAfterTimeouts })`** – per-endpoint state machine. Each endpoint is either _healthy_ (use `defaultMs`) or _tightened_ (use `tightenedMs`). After `tightenAfterTimeouts` consecutive `timeout` outcomes the endpoint flips to _tightened_; a single `ok` flips it back to _healthy_.

`PipelineOptions.timeout` accepts a `() => TimeoutPolicy` factory. The pipeline invokes it once per dataset, so policy state resets between datasets and one bad dataset can’t poison the next:

```typescript
import { adaptiveTimeoutPolicy } from '@lde/pipeline';

new Pipeline({
// …
timeout: adaptiveTimeoutPolicy({
defaultMs: 300_000, // 5 min while the endpoint is healthy
tightenedMs: 10_000, // 10 s once the endpoint is tightened
tightenAfterTimeouts: 2, // flip to tightened after 2 consecutive timeouts
}),
});
```

Outcomes are classified as:

| outcome | source |
| --------- | ------------------------------------------------------------------------ |
| `ok` | the request resolved |
| `timeout` | client-side `AbortSignal.timeout()` fired, or upstream returned HTTP 504 |
| `error` | anything else (other HTTP errors, parse errors, …) – neutral |

Transitions are forwarded to the `ProgressReporter` via `timeoutTightened` / `timeoutRelaxed`; `ConsoleReporter` prints them as `↘ Tightened` / `↗ Relaxed` lines so operators can tell a fast-failed stage from an unexpected speedup.

Implement `TimeoutPolicy` directly for custom strategies (closing over shared state in the factory if you want it to span datasets).

Timeouts live at the pipeline level — neither `SparqlConstructExecutor` nor `SparqlItemSelector` accept their own `timeout` option. Per-endpoint state belongs in the adaptive policy, and per-stage budgets aren’t supported. Reusable stage facades (`@lde/pipeline-void`, `@lde/pipeline-shacl-sampler`) follow the same convention.

### Validation

Stages can optionally validate their output quads against a `Validator`. Validation operates on the **combined output of all executors per batch**, not on individual quads or per-executor output. A batch produces a complete result set — a self-contained cluster of linked resources — that can be meaningfully matched against SHACL shapes. Even with a single executor, each batch is a complete unit; with multiple executors, shapes that reference triples from different executors are validated correctly.
Expand Down
Loading