Skip to content

feat(runner): minimal Pipeline SDK + BYOC hello-world E2E#7

Draft
rickstaa wants to merge 13 commits intomainfrom
feat/runner-mvp
Draft

feat(runner): minimal Pipeline SDK + BYOC hello-world E2E#7
rickstaa wants to merge 13 commits intomainfrom
feat/runner-mvp

Conversation

@rickstaa
Copy link
Copy Markdown
Member

@rickstaa rickstaa commented Apr 28, 2026

Summary

The Pipeline SDK — both halves: a request/response Pipeline for batch HTTP/SSE and a LivePipeline for real-time bidirectional video/audio over BYOC trickle. Developers write a single Python class and get a containerised, BYOC-compatible, schema-described capability with /health, /docs, /openapi.json, SSE streaming, and (when complete) /stream/* real-time endpoints. No go-livepeer changes required.

Each commit on this branch ships a strictly more capable SDK than the last; test.sh stays green at every step.

Authoring surfaces

Batch / streaming HTTP — Pipeline:

from livepeer_gateway.runner import Pipeline, serve

class Sentiment(Pipeline):
    def setup(self) -> None:
        self.classifier = pipeline("sentiment-analysis", model="...")

    def predict(self, text: str) -> dict:
        return self.classifier(text)[0]

if __name__ == "__main__":
    serve(Sentiment())

For binary I/O, swap text: str for image: Base64Bytes. For streaming, return a generator from predict() — the runtime auto-detects and frames each yielded value as SSE.

Real-time A/V — LivePipeline (in progress in this PR):

from livepeer_gateway.runner import LivePipeline, serve

class GrayscaleFilter(LivePipeline):
    async def process_video(self, frame):
        # frame.frame is an av.VideoFrame; transform and return
        return frame

if __name__ == "__main__":
    serve(GrayscaleFilter())

Subclasses override any of setup, on_stream_start, process_video, process_audio, on_params_update, on_stream_stop (all default to no-op / passthrough). A subclass that overrides nothing is a valid passthrough relay.

What's in this PR

Commit-by-commit progression (each shippable on its own):

Commit What it adds
C1 — minimal Pipeline SDK + hello-world E2E Pipeline ABC with predict() + aiohttp serve() + first BYOC docker-compose
C2 — setup() lifecycle + sentiment example One-time model loading hook; HuggingFace sentiment classifier example
FastAPI migration aiohttp → FastAPI; brings /docs, /openapi.json, /redoc for free
C4 — Pydantic typed I/O via signature introspection predict() params with type hints become a Pydantic model automatically; explicit BaseModel parameter also supported; /docs shows real fields
C5 — image upscale example (binary via Base64Bytes) Pydantic's Base64Bytes proves the SDK handles binary I/O cleanly
C6 — /health state machine LOADING / OK / ERROR / IDLE matching go-livepeer's HealthCheck wire format
C7 — SSE auto-detection + LLM chat example Generator predict()text/event-stream with [DONE] terminator; Qwen2.5-0.5B example
C8 Step 1 — LivePipeline ABC + /stream/* HTTP skeleton (landed) New LivePipeline base class + _make_live_pipeline_app dispatch path; routes accept the orchestrator's wire contract (validated via Pydantic); streaming logic lands in subsequent commits
C8 Step 2 — trickle bytes-through on /stream/start|stop (landed) _run_passthrough bridges subscribe → publish via existing TrickleSubscriber / TricklePublisher, segment-aligned and unmodified. /stream/start spawns the bridge as a background task on the pipeline; /stream/stop cancels and waits up to 5s for cleanup. Single-session for now (409 on double-start).
C8 Step 3 — frame decode → user transform → encode loop (planned) Reuse existing MediaOutput / MediaPublish; introduce livepeer_gateway.runner.frames namespace (clean VideoFrame / AudioFrame aliases)
C8 Step 4 — lifecycle hooks complete (planned) on_stream_start / on_params_update / on_stream_stop dispatch; emit_event / emit_data over events / data trickle channels; introduce _LiveSession to encapsulate per-session state
C8 Step 5 — examples/runner/live_grayscale/ end-to-end (planned) Full BYOC compose with go-livepeer orchestrator, register_capability, test.sh exercising real stream lifecycle

Surface

Module / class What it is
livepeer_gateway.runner.Pipeline ABC with setup() and abstract predict(**kwargs) -> Any
livepeer_gateway.runner.LivePipeline Base class for real-time A/V pipelines on the BYOC trickle protocol; default-passthrough hooks for process_video / process_audio
livepeer_gateway.runner.PipelineState LOADING / OK / ERROR / IDLE enum; matches go-livepeer HealthCheck format
livepeer_gateway.runner.serve(pipeline, *, host, port) FastAPI app + uvicorn server; dispatches on Pipeline vs LivePipeline
livepeer_gateway.runner.make_app(pipeline) Just the FastAPI app (for tests, custom uvicorn config)
POST /predict (Pipeline) Body validated via Pydantic from predict()'s signature; returns JSON or text/event-stream if predict() is a generator
POST /stream/start | stop | params (LivePipeline) Real-time stream lifecycle endpoints; bodies match the orchestrator's wire contract
GET /health HealthResponse { status: PipelineState } — orchestrator-aligned
GET /docs, GET /openapi.json, GET /redoc Standard FastAPI surface
examples/runner/hello_world/ Smoke test — minimal Pipeline + Dockerfile + compose + register_capability + curl test.sh
examples/runner/sentiment/ setup() lifecycle + HF sentiment classifier
examples/runner/image_upscale/ Binary I/O via Pydantic Base64Bytes — Swin2SR ~2x super-resolution
examples/runner/llm/ SSE streaming via TextIteratorStreamer — Qwen2.5-0.5B-Instruct
examples/runner/live_grayscale/ (planned) Real-time A/V pipeline E2E example via LivePipeline + go-livepeer trickle

The container's /predict and /stream/* paths match the existing go-livepeer BYOC contract verified against byoc/stream_orchestrator.go. No go-livepeer changes required.

Authoring patterns (Pipeline)

# Plain typed kwargs — Pydantic model built automatically
class Sentiment(Pipeline):
    def predict(self, text: str) -> dict: ...

# Explicit Pydantic input + output — full /docs + typed response
class Request(BaseModel):
    text: str
    threshold: float = 0.5

class Response(BaseModel):
    label: str
    score: float

class Sentiment(Pipeline):
    def predict(self, body: Request) -> Response: ...

# Streaming via generator return
class LLM(Pipeline):
    def predict(self, prompt: str) -> Iterator[ChatChunk]:
        for token in self.streamer:
            yield ChatChunk(token=token)

Test plan

Each example ships its own test.sh that prints PASS on success.

  • Local Python: uv run python examples/runner/hello_world/pipeline.pycurl localhost:5000/{health,predict} returns expected JSON.
  • Hello world E2E: cd examples/runner/hello_world && docker compose up -d --wait && ./test.shPASS. Round-trip: curl → gateway → orchestrator → SDK container.
  • Sentiment: setup() loads HF model once; test.sh exercises POSITIVE / NEGATIVE cases via EXPECTED_LABEL.
  • Image upscale: Base64Bytes round-trip; output asserted to be at least 2x input dimensions.
  • LLM: SSE round-trip via curl -N; assertion validates token framing + [DONE] terminator.
  • OpenAPI: /docs and /openapi.json render for every example with the actual field names (no additionalProp1).
  • Health state machine: /health returns LOADING during setup(), OK after, ERROR on setup failure.
  • LivePipeline skeleton: /stream/start|stop|params accept the orchestrator's wire contract; missing required fields → 422; /docs and /openapi.json show the new routes; batch Pipeline regression-checked.
  • LivePipeline trickle lifecycle: full session lifecycle (idle stop, missing-URL → 400, double-start → 409, cancel + restart, idempotent stop) tested via curl against a local server; trickle code paths exercised end-to-end including aiohttp connection setup, segment fetch loop, cancellation propagation through finally blocks.
  • LivePipeline E2E against go-livepeer: real Docker compose with live_grayscale example, full stream lifecycle (start → frames → stop) — lands with C8 Step 5.

Compose details

Each example's docker-compose.yml mirrors go-livepeer/doc/byoc.md:

  • livepeer/go-livepeer:master for orchestrator + gateway (no local build prerequisite)
  • On-chain mode against the free public Arbitrum RPC, pricePerUnit 0 → no real chain interaction, no funded wallet
  • Drops to bare -network offchain once livepeer/go-livepeer#3906 ships in :master — TODO comments in each compose track the cleanup
  • Examples that load HF models bake them into the image at build time via prepare_models.py so setup() loads from local cache in milliseconds

What's next (separate PRs, after this one merges)

Tracked in #8 — Pipeline SDK roadmap:

  • C9 — livepeer push CLI + livepeer.yaml manifest
  • C10 — Schema as Docker image label (org.livepeer.pipeline.schema)
  • C11 — Agent-friendly docs (AGENTS.md, expanded docstrings, examples/runner/_template/)
  • C12 — Migrate to monorepo with PEP 420 namespace packages (livepeer.runner.*, livepeer.client.*, livepeer.trickle.*)

Related work

🤖 Generated with Claude Code

rickstaa and others added 3 commits April 29, 2026 00:19
Adds livepeer_gateway.runner — a Pipeline ABC and a thin aiohttp serve
layer — plus a hello-world example that runs end-to-end against an
unmodified go-livepeer BYOC stack.

Surface:
- livepeer_gateway.runner.Pipeline — ABC with predict()
- livepeer_gateway.runner.serve(pipeline) → aiohttp app:
  - POST /predict — body JSON kwargs to predict();
                    TypeError → 400, other exception → 500
  - GET  /health  — {"status": "ready"}
- examples/runner/hello_world/ — Pipeline subclass + Dockerfile +
  docker-compose + capability registration + e2e curl test

The container's /predict path matches the existing go-livepeer BYOC
contract — no go-livepeer changes required.
./examples/runner/hello_world/test.sh printing PASS proves the round-trip:
curl → gateway → orchestrator → SDK container → response.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pipeline.setup() is a non-abstract no-op called once before serve()
accepts requests. Subclasses override to load model weights.

Adds examples/runner/sentiment/ — a Pipeline subclass that classifies
text via Hugging Face transformers. setup() loads the distilbert model
from the local HF cache populated at build time by prepare_models.py.

Surface:
- Pipeline.setup() no-op default
- make_app() invokes pipeline.setup() before binding routes
- examples/runner/sentiment/ — pipeline + prepare_models + Dockerfile +
  docker-compose + register + test.sh + README

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tracks operational items not suited to code comments — examples
extraction trigger, BYOC offchain compose cleanup pending #3906,
SDK feature gaps mapped to planned commits, related upstream PRs.

Working surface, drained as items land.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@rickstaa rickstaa mentioned this pull request Apr 29, 2026
19 tasks
rickstaa and others added 8 commits April 29, 2026 11:57
Replaces the aiohttp serve layer with FastAPI + uvicorn. Pipeline API
unchanged — Pipeline.predict() and Pipeline.setup() behave identically.

Free additions from FastAPI:
- GET /docs (Swagger UI)
- GET /redoc
- GET /openapi.json (minimal until Input/Output land)

Handler dispatch:
- /predict and /health are sync def, so pipeline.predict() (CPU/GPU
  bound) runs in FastAPI's threadpool and never blocks the event loop.
- Request body parsed via Body(...) — framework handles JSON parse
  errors and dict-type validation, returning HTTP 422.

Notes:
- Error response shape changes from {"error": ...} to {"detail": ...}.
  Body validation errors return 422 (was 400 in aiohttp). Other status
  codes unchanged: TypeError on wrong predict() kwargs → 400; pipeline
  exceptions → 500.
- aiohttp stays in deps; livepeer_gateway.transport's trickle client
  uses aiohttp.ClientSession. FastAPI server + aiohttp client coexist.

Refs #8 (C3)
Switches expose: to ports: so /docs, /redoc, and /openapi.json
are browsable on http://localhost:5000 during dev. Example
READMEs updated.
…tion

predict()'s signature drives FastAPI's body type and response model.
Two paths:
- Explicit BaseModel param: pass body to predict() directly
- Bare typed params: auto-derive a Pydantic model via create_model
  and unpack as kwargs

OpenAPI now reflects real types — /docs shows declared fields with
descriptions, examples, constraints, and typed responses when the
return annotation is a BaseModel.

Refs #8 (C4)
…se64Bytes

Swin2SR x2 super-resolution as a BYOC capability. Input image is a
base64-encoded JPEG/PNG; output is a base64-encoded PNG. Pydantic's
Base64Bytes auto-decodes the request body to bytes, so the pipeline
gets bytes directly and the SDK ships zero binary-handling code.

Refs #8 (C5)
Pipeline tracks state across setup() and exposes it via /health,
matching go-livepeer's HealthCheck wire format
(ai/worker/runner.gen.go). Re-raises on setup() failure so the
container still exits fail-fast.

Refs #8 (C6)
When predict() is a generator, the SDK wraps the response with
StreamingResponse(text/event-stream) and frames each yielded value as
an OpenAI-style SSE event terminated by [DONE]. Both go-livepeer's
BYOC gateway and the Python caller-side gateway watch for [DONE] to
end the stream.

Co-authored-by: John | Elite Encoder <john@eliteencoder.net>
pricePerUnit=0 means no orchestrator charges, no ticket settlement,
empty wallet stays unused. Replaces the previous pricePerUnit=1
workaround that relied on tickets rarely firing.
rickstaa and others added 2 commits April 30, 2026 13:45
Adds LivePipeline base class with setup/on_stream_start/process_video/
process_audio/on_params_update/on_stream_stop hooks (all default-passthrough)
plus emit_event/emit_data stubs. Splits make_app dispatch into
_make_pipeline_app (Pipeline → /predict) and _make_live_pipeline_app
(LivePipeline → /stream/start|stop|params), sharing _run_setup and
_add_health_route. Routes accept and validate the orchestrator's wire
contract; streaming coordinator (subscribe/publish loops, lifecycle
dispatch) lands in subsequent commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds _run_passthrough coroutine that bridges subscribe → publish trickle
channels segment-by-segment using the existing TrickleSubscriber and
TricklePublisher. /stream/start spawns it as a background task on the
LivePipeline; /stream/stop cancels and waits up to 5s for graceful
cleanup before returning. Single-session for now (409 on double-start);
data-only / event-only streams (no subscribe_url + publish_url) return
400 — both extensions land in subsequent commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant