Skip to content

Unify pipeline extension surface: capability slots on PipelinePlugin + fetch middleware #422

@ddeboer

Description

@ddeboer

Context

PR #421 introduced TimeoutPolicy as a dedicated PipelineOptions.timeout field — a parallel extension mechanism alongside the existing PipelinePlugin interface. That was the wrong altitude: the README already advertises plugins as the lifecycle extension surface, and adding a sibling option silently widens the public API.

The right model is to unify all pipeline extension under PipelinePlugin, treating different concerns as typed capability slots rather than forcing them through a single hook with a forced return type.

Problem with the current shape

  • TimeoutPolicy and PipelinePlugin are two unrelated extension surfaces. New contributors have to learn both, and the choice of which to use for a new resilience concern (retry, rate limit, telemetry, request rewriting) is undefined.
  • SparqlConstructExecutorOptions.fetcher is the only escape hatch for HTTP-level customization (auth headers, mirror routing, instrumentation). It silently bypasses TimeoutPolicy because SparqlEndpointFetcher exposes no per-call signal. The JSDoc in feat(pipeline): adaptive per-endpoint SPARQL timeouts #421 documents this gap honestly, but the gap remains.
  • classifyOutcome lives in executor.ts and selector.ts as duplicated private code. Third-party Executor implementations that want to feed a TimeoutPolicy would have to re-implement the classification by hand.

Proposal

Widen PipelinePlugin to a bag of typed capabilities. Each capability has its own contract; plugins implement only the slots they care about.

export interface PipelinePlugin {
  name: string;

  // existing
  beforeStageWrite?: QuadTransform;

  // new: typed capability slots
  timeoutPolicy?: TimeoutPolicy;
  fetch?: FetchMiddleware;

  // additive observers
  onRequestStart?: (ctx: RequestContext) => void;
  onRequestEnd?: (ctx: RequestContext) => void;
}

export type FetchMiddleware = (
  request: Request,
  next: (request: Request) => Promise<Response>,
) => Promise<Response>;

Composition rules per capability:

capability shape composition
beforeStageWrite QuadTransform chain left-to-right (existing)
timeoutPolicy TimeoutPolicy single — throw at construction if multiple plugins supply one
fetch FetchMiddleware onion compose (standard Express/Koa/Hono pattern)
onRequestStart/End (ctx) => void additive — all observers fire

Usage stays clean:

new Pipeline({
  plugins: [
    adaptiveTimeoutPlugin({
      defaultMs: 300_000,
      tightenedMs: 10_000,
      tightenAfterTimeouts: 2,
    }),
    bearerAuthPlugin({ token: process.env.SPARQL_TOKEN }),
    telemetryPlugin({ exporter: otel }),
    provenancePlugin(),
  ],
});

adaptiveTimeoutPlugin(opts) is a one-liner: { name: 'adaptive-timeout', timeoutPolicy: new AdaptiveTimeoutPolicy(opts) }. The Pipeline scans plugins, picks the single timeoutPolicy, threads it through Stage.run exactly as it does today.

bearerAuthPlugin ships a fetch middleware that adds the Authorization header. The executor always constructs its own SparqlEndpointFetcher and passes the composed middleware chain as the fetcher's fetch callback — no more user-fetcher escape hatch needed.

What this subsumes

This redesign supersedes several smaller follow-ups surfaced by #421's review:

  • Drop SparqlConstructExecutorOptions.fetcher in favour of a fetch callback. The middleware chain is the canonical seam; auth, mocking, mirroring, and tracing all go through it.
  • Make the outcome classifier injectable. A custom TimeoutPolicy (or even a separate plugin slot) can override classifyOutcome if needed.
  • Move classifyOutcome out of executor.ts / selector.ts. It becomes part of the plugin surface — exported alongside TimeoutPolicy.
  • Add per-stage timeout budgets (separate ask raised during PR review). Plugins are already per-pipeline; a future StagePlugin slot, or a timeoutPolicy that reads stage context, fits naturally without growing the option bag.

Out of scope

  • The per-dataset state-reset semantics already enforced by Pipeline (timeoutFactory() called once per dataset) carries over unchanged — the policy is still scoped per dataset.
  • HTTP/2 connection pooling and full circuit-breaker semantics (cockatiel-style) remain out of scope; this issue is about the extension shape, not adding new resilience primitives.

Sequencing

Land #421 as-is. The TimeoutPolicy interface and its internal model (per-dataset, beforeRequest/afterRequest) survive the refactor unchanged — only the way it enters the Pipeline (dedicated timeout option vs plugins: [...] entry) changes.

A follow-up PR implements this design: widens PipelinePlugin, wraps AdaptiveTimeoutPolicy/ConstantTimeoutPolicy in plugin factories, removes PipelineOptions.timeout and SparqlConstructExecutorOptions.fetcher, and updates the README to reflect the unified surface.

Related: #421.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Task.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions