Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ lp_rpc.proto

# environment file
env

# IDE
.vscode/
*.code-workspace
.idea/
.codex
75 changes: 75 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# TODO

Repo-level follow-ups that don't belong in code comments or spec docs.
Pending operational decisions and triggers for future work.

## Examples directory

- [ ] **Extract `examples/runner/` to a dedicated repo
(`livepeer/livepeer-pipeline-examples`)** when any of these triggers hit:
- 3+ runner examples in this repo
- First multi-GB-model example (image gen, LLM, real-time video) — CI
cost becomes painful
- First community contribution
- Cog precedent: `replicate/cog-examples` lives separately from `cog`.

Keep `hello_world` in this repo as the CI smoke test even after extraction.
Each example in the new repo pins to a specific `livepeer-gateway` version.

## BYOC offchain support

- [ ] **Drop `-network arbitrum-one-mainnet`, `-ethUrl`, `-ethPassword`
from example compose files** once
[livepeer/go-livepeer#3906](https://github.com/livepeer/go-livepeer/pull/3906)
ships in `:master`. After that, examples can run with bare
`-network offchain`. Tracked: TODO comment already in each compose.

## SDK round-trip

- [ ] **Replace `test.sh`'s curl + base64 Livepeer header with a Python SDK
batch caller** once a batch caller lands (built on
[livepeer/livepeer-python-gateway#6](https://github.com/livepeer/livepeer-python-gateway/pull/6)'s
signing primitives). At that point the example compose can drop the
`gateway` service entirely — caller talks direct to the orchestrator via
the remote signer (per
[livepeer/go-livepeer#3869](https://github.com/livepeer/go-livepeer/pull/3869)).
Tracked: TODO comment in each example's `test.sh`.

## SDK feature gaps

- [ ] **Health state machine** (`LOADING / READY / ERROR / IDLE`) on
`/health` body. Currently `setup()` blocks the server bind, and the
example uses a docker compose healthcheck as a workaround. When the
state machine lands, drop the healthcheck from `sentiment/docker-compose.yml`.

- [ ] **`Input()` / `Output()` typed descriptors** (C3 in the planned
staircase). Required before schema generation.

- [ ] **Schema generation + `GET /openapi.json`** (C4). Inspects
`predict()` / `on_frame()` signature, emits OpenAPI JSON.

- [ ] **`GET /` discovery doc** (C5). Points at schema URL, capability id,
version, supported transports. Cog parallel.

- [ ] **`StreamPipeline` for trickle transport.** With `on_frame()` /
`on_video_frame()` / `on_audio_frame()`. Reuse existing trickle primitives
in `livepeer_gateway.transport`.

- [ ] **SSE auto-detection** for generators. When `predict()` yields,
emit `text/event-stream`.

- [ ] **`livepeer push` CLI** + `livepeer.yaml` manifest. Parses the
manifest, generates the Dockerfile (no more hand-written examples),
builds, registers. Drops the example Dockerfiles.

- [ ] **Schema as Docker image label** (`org.livepeer.pipeline.schema`).
Lands with `livepeer push`. Removes the runtime `/openapi.json` as the
primary schema delivery; keeps it as a fallback.

## Tracking related upstream work

- go-livepeer offchain BYOC: [#3905](https://github.com/livepeer/go-livepeer/issues/3905) /
[#3906](https://github.com/livepeer/go-livepeer/pull/3906)
- BYOC remote signer: [#3869](https://github.com/livepeer/go-livepeer/pull/3869)
- Caller-side BYOC SDK: [#6](https://github.com/livepeer/livepeer-python-gateway/pull/6)
- This SDK's draft PR: [#7](https://github.com/livepeer/livepeer-python-gateway/pull/7)
15 changes: 15 additions & 0 deletions examples/runner/hello_world/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM python:3.11-slim

WORKDIR /app

# Install the package from source — pulls aiohttp, grpcio, protobuf, av per
# pyproject.toml. Build context is the repo root.
COPY pyproject.toml README ./
COPY src /app/src
RUN pip install --no-cache-dir /app

COPY examples/runner/hello_world/pipeline.py /app/pipeline.py

EXPOSE 5000

CMD ["python", "/app/pipeline.py"]
51 changes: 51 additions & 0 deletions examples/runner/hello_world/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Hello world (BYOC)

Smallest end-to-end test of the Pipeline SDK against a real
[go-livepeer](https://github.com/livepeer/go-livepeer) BYOC stack. A `Pipeline`
subclass returns `{"message": "hello, X"}` over HTTP. Registered as a BYOC
capability, called through the gateway, response flows back end-to-end.

## Run

```bash
docker compose up -d
./test.sh
docker compose down
```

`test.sh` prints `PASS` on success.

## Browse the API

- Swagger UI: <http://localhost:5000/docs>
- ReDoc: <http://localhost:5000/redoc>
- OpenAPI JSON: <http://localhost:5000/openapi.json>

## What's running

```mermaid
sequenceDiagram
autonumber
participant curl
participant gateway
participant orchestrator
participant hello_world as hello_world<br/>(SDK container)

curl->>gateway: POST /process/request/predict
gateway->>orchestrator: forward (Livepeer-signed)
orchestrator->>hello_world: POST /predict {"name":"..."}
hello_world-->>orchestrator: {"message":"hello, ..."}
orchestrator-->>gateway: response
gateway-->>curl: response
```

Four compose services:

| Service | What it is |
| --- | --- |
| `gateway`, `orchestrator` | `livepeer/go-livepeer:master` from Docker Hub |
| `hello_world` | The pipeline container — a [BYOC](https://github.com/livepeer/go-livepeer/blob/main/doc/byoc.md) capability built with `livepeer_gateway.runner`. Attached via HTTP register, not the `-aiWorker` mechanism. |
| `register_capability` | One-shot helper that POSTs to `orchestrator:8935/capability/register` |

First `docker compose up` pulls `livepeer/go-livepeer:master` (~few hundred MB,
cached after) and builds the `hello_world` image locally.
67 changes: 67 additions & 0 deletions examples/runner/hello_world/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
services:
# Mirrors go-livepeer/doc/byoc.md: on-chain mode against a public Arbitrum
# RPC, but `pricePerUnit 0` keeps the registration free — no balance ever
# leaves the auto-generated keystore. No real chain interaction occurs.
#
# TODO: once livepeer/go-livepeer#3906 ships in :master, drop -network /
# -ethUrl / -ethPassword and run with bare `-network offchain`. Tracked
# in livepeer/go-livepeer#3905.

orchestrator:
image: livepeer/go-livepeer:master
container_name: orchestrator
command: >
-network arbitrum-one-mainnet
-orchestrator
-ethUrl https://arb1.arbitrum.io/rpc
-ethPassword secret-password
-pricePerUnit 0
-serviceAddr=orchestrator:8935
-orchSecret=orch-secret
-v 6

gateway:
image: livepeer/go-livepeer:master
container_name: gateway
command: >
-network arbitrum-one-mainnet
-gateway
-ethUrl https://arb1.arbitrum.io/rpc
-ethPassword secret-password
-orchAddr=orchestrator:8935
-httpAddr=0.0.0.0:9935
-httpIngest
-v 6
ports:
- "9935:9935"
depends_on:
- orchestrator

hello_world:
build:
context: ../../..
dockerfile: examples/runner/hello_world/Dockerfile
container_name: hello_world
ports:
- "5000:5000"
depends_on:
- orchestrator

register_capability:
image: python:3.11-slim
container_name: register_capability
command: sh -c "pip install --quiet requests && python /app/register_capability.py"
volumes:
- ./register_capability.py:/app/register_capability.py:ro
environment:
ORCH_URL: https://orchestrator:8935
ORCH_SECRET: orch-secret
CAPABILITY_NAME: hello-world
CAPABILITY_URL: http://hello_world:5000
depends_on:
- hello_world
- orchestrator

networks:
default:
name: livepeer
12 changes: 12 additions & 0 deletions examples/runner/hello_world/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Hello-world BYOC pipeline. Run via ``docker compose up``."""

from livepeer_gateway.runner import Pipeline, serve


class HelloWorld(Pipeline):
def predict(self, name: str = "world") -> dict:
return {"message": f"hello, {name}"}


if __name__ == "__main__":
serve(HelloWorld())
52 changes: 52 additions & 0 deletions examples/runner/hello_world/register_capability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Register the hello-world capability with the orchestrator.

Wire format follows
https://github.com/livepeer/go-livepeer/blob/main/doc/byoc.md
(handler in ``byoc/job_orchestrator.go``).
"""

import os
import sys
import time

import requests
import urllib3

# Orchestrator's HTTPS endpoint uses a self-signed cert.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

ORCH_URL = os.environ.get("ORCH_URL", "https://orchestrator:8935")
ORCH_SECRET = os.environ.get("ORCH_SECRET", "orch-secret")
CAPABILITY_NAME = os.environ.get("CAPABILITY_NAME", "hello-world")
CAPABILITY_URL = os.environ.get("CAPABILITY_URL", "http://hello_world:5000")
MAX_ATTEMPTS = int(os.environ.get("MAX_ATTEMPTS", "30"))

data = {
"name": CAPABILITY_NAME,
"url": CAPABILITY_URL,
"capacity": 1,
"price_per_unit": 0,
"price_scaling": 1,
"currency": "wei",
}
headers = {"Authorization": ORCH_SECRET}

for attempt in range(1, MAX_ATTEMPTS + 1):
try:
r = requests.post(
f"{ORCH_URL}/capability/register",
json=data,
headers=headers,
verify=False,
timeout=5,
)
if r.status_code == 200:
print(f"registered {CAPABILITY_NAME} -> {CAPABILITY_URL}")
sys.exit(0)
print(f"attempt {attempt}: status={r.status_code} body={r.text!r}")
except Exception as exc:
print(f"attempt {attempt}: {exc}")
time.sleep(2)

print("registration failed after timeout")
sys.exit(1)
39 changes: 39 additions & 0 deletions examples/runner/hello_world/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env bash
# E2E: send a request through the gateway, assert the response from the
# hello_world container comes back through the orchestrator.

set -euo pipefail
cd "$(dirname "$0")"

GATEWAY_URL="${GATEWAY_URL:-http://localhost:9935}"
NAME="${NAME:-livepeer}"
EXPECTED_MSG="hello, ${NAME}"

echo "Waiting for capability registration..."
for i in $(seq 1 60); do
if docker logs register_capability 2>&1 | grep -q "registered hello-world"; then
echo " registered."
break
fi
sleep 2
done

# TODO: swap curl for a livepeer_gateway batch caller (post PR #6) — drops
# the gateway service from compose.
LIVEPEER_HDR=$(printf '%s' '{"request":"{}","parameters":"{}","capability":"hello-world","timeout_seconds":30}' | base64 -w0)

echo "Sending request through gateway..."
RESPONSE=$(curl -fsS -X POST "${GATEWAY_URL}/process/request/predict" \
-H "Livepeer: ${LIVEPEER_HDR}" \
-H "Content-Type: application/json" \
-d "{\"name\":\"${NAME}\"}")

echo "Response: ${RESPONSE}"

if echo "${RESPONSE}" | grep -qE "\"message\"[[:space:]]*:[[:space:]]*\"${EXPECTED_MSG}\""; then
echo "PASS"
exit 0
fi

echo "FAIL"
exit 1
26 changes: 26 additions & 0 deletions examples/runner/image_upscale/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM python:3.11-slim

WORKDIR /app

# SDK install (in-repo source until livepeer-gateway publishes; will collapse
# to a single `pip install livepeer-gateway` line once on PyPI).
COPY pyproject.toml README ./
COPY src /app/src
RUN pip install --no-cache-dir /app

# Pipeline-specific deps. The requirements.txt sets --extra-index-url to
# pull the CPU-only torch wheel (~200 MB vs ~5 GB for the CUDA variant).
COPY examples/runner/image_upscale/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Bake model weights at build time so setup() loads from local disk in
# milliseconds.
COPY examples/runner/image_upscale/prepare_models.py /app/prepare_models.py
RUN python /app/prepare_models.py

# Pipeline code last so edits don't invalidate the bake layer above.
COPY examples/runner/image_upscale/pipeline.py /app/pipeline.py

EXPOSE 5000

CMD ["python", "/app/pipeline.py"]
Loading