diff --git a/cmd/obol/buy.go b/cmd/obol/buy.go index fb19d923..dbba531c 100644 --- a/cmd/obol/buy.go +++ b/cmd/obol/buy.go @@ -59,6 +59,7 @@ func buyCommand(cfg *config.Config) *cli.Command { Usage: "Buy access to remote services via x402 micropayments", Commands: []*cli.Command{ buyInferenceCommand(cfg), + buyDatasetCommand(cfg), }, } } diff --git a/cmd/obol/dataset.go b/cmd/obol/dataset.go new file mode 100644 index 00000000..a0744d78 --- /dev/null +++ b/cmd/obol/dataset.go @@ -0,0 +1,495 @@ +package main + +// obol dataset — owner side of a versioned, membership-gated dataset offer. +// +// obol dataset from --name ingest a bundle as a new +// signed version (creates v1). +// obol dataset version --bundle append the next signed version. +// obol dataset publish host the artifact server on +// this machine + a Cloudflare +// tunnel; gate every byte. +// obol dataset approve admit a worker (membership). +// obol dataset verify walk the signed version chain. +// obol dataset status versions + members. +// +// The artifact server is the host gateway (same spirit as `obol sell +// inference` / `obol research publish`): it runs on the owner's machine, never +// in the cluster, and reaches remote buyers over the real internet via +// Cloudflare. Bytes never leave the host un-gated. + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/ObolNetwork/obol-stack/internal/config" + "github.com/ObolNetwork/obol-stack/internal/dataset" + "github.com/urfave/cli/v3" +) + +// datasetState lets approve/status reach a running publish server. +type datasetState struct { + ID string `json:"id"` + LocalAddr string `json:"local_addr"` + PublicURL string `json:"public_url"` + OwnerToken string `json:"owner_token"` +} + +func datasetCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "dataset", + Usage: "Publish and sell versioned, membership-gated datasets", + Commands: []*cli.Command{ + datasetFromCommand(cfg), + datasetVersionCommand(cfg), + datasetPublishCommand(cfg), + datasetApproveCommand(cfg), + datasetVerifyCommand(cfg), + datasetStatusCommand(cfg), + }, + } +} + +func datasetFromCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "from", + Usage: "Ingest a dataset bundle directory as a new signed version", + ArgsUsage: "", + Flags: []cli.Flag{&cli.StringFlag{Name: "name", Usage: "Dataset id", Required: true}}, + Action: func(_ context.Context, cmd *cli.Command) error { + if cmd.NArg() != 1 { + return fmt.Errorf("bundle directory required: obol dataset from --name ") + } + return appendDatasetVersion(cfg, cmd, strings.TrimSpace(cmd.String("name")), cmd.Args().First()) + }, + } +} + +func datasetVersionCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "version", + Usage: "Append the next signed version of an existing dataset", + ArgsUsage: "", + Flags: []cli.Flag{&cli.StringFlag{Name: "bundle", Usage: "New bundle directory", Required: true}}, + Action: func(_ context.Context, cmd *cli.Command) error { + if cmd.NArg() != 1 { + return fmt.Errorf("dataset id required: obol dataset version --bundle ") + } + id := strings.TrimSpace(cmd.Args().First()) + if _, err := os.Stat(datasetStorePath(cfg, id)); err != nil { + return fmt.Errorf("dataset %q not found — create it with 'obol dataset from'", id) + } + return appendDatasetVersion(cfg, cmd, id, cmd.String("bundle")) + }, + } +} + +func appendDatasetVersion(cfg *config.Config, cmd *cli.Command, id, bundleDir string) error { + u := getUI(cmd) + if id == "" { + return fmt.Errorf("dataset id required") + } + key, err := dataset.LoadOrCreateKey(datasetKeyPath(cfg, id)) + if err != nil { + return err + } + signer := dataset.NewEthSigner(key) + + manifestHash, artifactPath, fileHash, size, err := dataset.ReadBundle(bundleDir) + if err != nil { + return err + } + + store := dataset.NewStore(datasetStorePath(cfg, id)) + st, err := store.Load() + if err != nil { + return err + } + log := dataset.LogFromVersions(st.Versions) + v, err := log.Append(manifestHash, fileHash, size, signer, time.Now()) + if err != nil { + return err + } + + st.ID, st.GroupID, st.Versions = id, id, log.Versions() + if st.Artifacts == nil { + st.Artifacts = map[int]string{} + } + st.Artifacts[v.Seq] = artifactPath + if err := store.Save(st); err != nil { + return err + } + + u.Successf("Dataset %q version %d recorded", id, v.Seq) + u.Infof("Manifest hash: %s", v.ManifestHash) + u.Infof("File hash: %s", v.FileHash) + u.Infof("Size: %d bytes", v.Size) + u.Infof("Owner: %s", signer.SignerID()) + u.Dim("Publish it with: obol dataset publish " + id) + return nil +} + +func datasetPublishCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "publish", + Usage: "Host the dataset's artifact server and expose it over a Cloudflare tunnel", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.StringFlag{Name: "membership", Usage: "open | invite", Value: "invite"}, + &cli.IntFlag{Name: "port", Usage: "Local port (0 = pick a free one)", Value: 0}, + &cli.BoolFlag{Name: "no-tunnel", Usage: "Serve locally only"}, + }, + Action: func(ctx context.Context, cmd *cli.Command) error { + u := getUI(cmd) + if cmd.NArg() != 1 { + return fmt.Errorf("dataset id required: obol dataset publish ") + } + id := strings.TrimSpace(cmd.Args().First()) + + key, err := dataset.LoadOrCreateKey(datasetKeyPath(cfg, id)) + if err != nil { + return err + } + signer := dataset.NewEthSigner(key) + + store := dataset.NewStore(datasetStorePath(cfg, id)) + st, err := store.Load() + if err != nil { + return err + } + if len(st.Versions) == 0 { + return fmt.Errorf("dataset %q has no versions — run 'obol dataset from' first", id) + } + artifacts := dataset.NewFileArtifacts() + for seq, path := range st.Artifacts { + artifacts.Set(seq, path) + } + ents := dataset.NewEntitlements() + ents.Load(st.Entitlements) + + ownerToken, err := randomToken("obol-dataset-owner-") + if err != nil { + return err + } + srv := dataset.NewServer(dataset.Config{ + ID: id, + Membership: cmd.String("membership"), + OwnerToken: ownerToken, + OwnerSigner: signer.SignerID(), + Log: dataset.LogFromVersions(st.Versions), + Ents: ents, + Store: store, + Artifacts: artifacts, + Payments: forwardedPayment{}, + }) + + ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.Int("port"))) + if err != nil { + return fmt.Errorf("listen: %w", err) + } + localAddr := "http://" + ln.Addr().String() + httpSrv := &http.Server{Handler: srv.Handler()} + go func() { _ = httpSrv.Serve(ln) }() + + publicURL := localAddr + var tunnel *exec.Cmd + if !cmd.Bool("no-tunnel") { + u.Info("Opening Cloudflare tunnel …") + if turl, tcmd, terr := startQuickTunnel(ctx, ln.Addr().String()); terr != nil { + u.Warnf("tunnel failed (%v) — serving locally only at %s", terr, localAddr) + } else { + publicURL, tunnel = turl, tcmd + } + } + + head, _ := dataset.LogFromVersions(st.Versions).Head() + _ = writeDatasetState(cfg, datasetState{ID: id, LocalAddr: localAddr, PublicURL: publicURL, OwnerToken: ownerToken}) + + u.Successf("Dataset %q published (head version %d)", id, head.Seq) + u.Infof("Public URL: %s", publicURL) + u.Infof("Owner: %s", signer.SignerID()) + u.Infof("Membership: %s", cmd.String("membership")) + u.Blank() + u.Bold("Buyers fetch with:") + u.Printf(" obol buy dataset %s --id %s --member-token ", publicURL, id) + if cmd.String("membership") == dataset.MembershipInvite { + u.Dim("Admit a worker's printed code: obol dataset approve ") + } + u.Dim("Ctrl-C to stop.") + + sigCtx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + defer stop() + <-sigCtx.Done() + + u.Info("Stopping …") + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = httpSrv.Shutdown(shutCtx) + if tunnel != nil && tunnel.Process != nil { + _ = tunnel.Process.Kill() + } + _ = removeDatasetState(cfg, id) + return nil + }, + } +} + +func datasetApproveCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "approve", + Usage: "Admit a worker to a dataset (the membership decision)", + ArgsUsage: "", + Flags: []cli.Flag{&cli.StringFlag{Name: "dataset", Usage: "Dataset id (defaults to the only running one)"}}, + Action: func(ctx context.Context, cmd *cli.Command) error { + u := getUI(cmd) + if cmd.NArg() != 1 { + return fmt.Errorf("user code required: obol dataset approve ") + } + st, err := loadDatasetState(cfg, cmd.String("dataset")) + if err != nil { + return err + } + body, _ := json.Marshal(map[string]string{"user_code": cmd.Args().First()}) + resp, err := datasetOwnerReq(ctx, st, http.MethodPost, "/auth/device/approve", body) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("approve failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(b))) + } + u.Successf("Approved %s into %q", cmd.Args().First(), st.ID) + return nil + }, + } +} + +func datasetVerifyCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "verify", + Usage: "Walk a dataset's signed version chain (offline)", + ArgsUsage: "", + Action: func(_ context.Context, cmd *cli.Command) error { + u := getUI(cmd) + if cmd.NArg() != 1 { + return fmt.Errorf("dataset id required: obol dataset verify ") + } + id := strings.TrimSpace(cmd.Args().First()) + key, err := dataset.LoadOrCreateKey(datasetKeyPath(cfg, id)) + if err != nil { + return err + } + owner := dataset.NewEthSigner(key).SignerID() + st, err := dataset.NewStore(datasetStorePath(cfg, id)).Load() + if err != nil { + return err + } + log := dataset.LogFromVersions(st.Versions) + if err := log.Verify(dataset.EthVerifier{}, owner); err != nil { + return fmt.Errorf("chain INVALID: %w", err) + } + head, _ := log.Head() + u.Successf("Chain valid: %d version(s), head v%d, owner %s", log.Len(), head.Seq, owner) + return nil + }, + } +} + +func datasetStatusCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "status", + Usage: "Show versions and member count", + ArgsUsage: "", + Action: func(ctx context.Context, cmd *cli.Command) error { + u := getUI(cmd) + if cmd.NArg() != 1 { + return fmt.Errorf("dataset id required: obol dataset status ") + } + id := strings.TrimSpace(cmd.Args().First()) + st, err := dataset.NewStore(datasetStorePath(cfg, id)).Load() + if err != nil { + return err + } + log := dataset.LogFromVersions(st.Versions) + u.Bold(fmt.Sprintf("Dataset %s — %d version(s)", id, log.Len())) + for _, v := range log.Versions() { + u.Printf(" v%d %s %d bytes (%s…)", v.Seq, v.ManifestHash[:12], v.Size, v.Signature[:12]) + } + u.Infof("Entitled members: %d", len(st.Entitlements)) + return nil + }, + } +} + +// buyDatasetCommand is added to `obol buy` so buyers fetch with +// `obol buy dataset --id --member-token `. +func buyDatasetCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "dataset", + Usage: "Download a versioned dataset over HTTP, verifying its whole-file hash", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.StringFlag{Name: "id", Usage: "Dataset id (or embed /dataset/ in the URL)"}, + &cli.IntFlag{Name: "version", Usage: "Version to fetch (0 = head)"}, + &cli.StringFlag{Name: "member-token", Usage: "Member token (owner-issued or payment-minted)", Required: true}, + &cli.StringFlag{Name: "out", Usage: "Output file (default -v.jsonl)"}, + }, + Action: func(ctx context.Context, cmd *cli.Command) error { + u := getUI(cmd) + if cmd.NArg() != 1 { + return fmt.Errorf("seller URL required: obol buy dataset --id --member-token ") + } + base, id := splitDatasetURL(cmd.Args().First(), cmd.String("id")) + if id == "" { + return fmt.Errorf("dataset id required (pass --id or a /dataset/ URL)") + } + out := cmd.String("out") + if out == "" { + v := cmd.Int("version") + if v == 0 { + v = 1 + } + out = fmt.Sprintf("%s-v%d.jsonl", id, v) + } + u.Infof("Fetching %s (version %v) → %s", id, orHead(cmd.Int("version")), out) + res, err := dataset.Fetch(ctx, dataset.FetchOptions{ + BaseURL: base, ID: id, Version: cmd.Int("version"), + Token: cmd.String("member-token"), OutPath: out, + }) + if err != nil { + return err + } + if res.Resumed { + u.Dim("(resumed an interrupted download)") + } + u.Successf("Verified v%d: %d bytes, file hash %s", res.Version, res.Bytes, res.FileHash) + u.Dim("Manifest: " + res.ManifestHash) + return nil + }, + } +} + +// --- payment validation (behind the edge x402-verifier) --- + +// forwardedPayment trusts the edge x402-verifier to have proven a settled +// payment upstream; it extracts the paid version/amount from forwarded +// headers. It is only reachable on the membership-gated /join/paid route +// (never a raw public route). +type forwardedPayment struct{} + +func (forwardedPayment) Validate(r *http.Request, _ string) (int, string, error) { + if r.Header.Get("X-Payment-Response") == "" && r.Header.Get("X-Payment") == "" { + return 0, "", fmt.Errorf("no settled payment forwarded") + } + v, _ := strconv.Atoi(r.Header.Get("X-Dataset-Version")) + if v < 1 { + v = 1 + } + return v, r.Header.Get("X-Dataset-Atomic"), nil +} + +// --- state + url helpers --- + +func datasetServeDir(cfg *config.Config) string { return filepath.Join(cfg.ConfigDir, "dataset-serve") } + +func datasetKeyPath(cfg *config.Config, id string) string { + return filepath.Join(datasetServeDir(cfg), id+".key") +} + +func datasetStorePath(cfg *config.Config, id string) string { + return filepath.Join(datasetServeDir(cfg), id+".store.json") +} + +func writeDatasetState(cfg *config.Config, st datasetState) error { + if err := os.MkdirAll(datasetServeDir(cfg), 0o700); err != nil { + return err + } + b, _ := json.MarshalIndent(st, "", " ") + return os.WriteFile(filepath.Join(datasetServeDir(cfg), st.ID+".state.json"), b, 0o600) +} + +func removeDatasetState(cfg *config.Config, id string) error { + return os.Remove(filepath.Join(datasetServeDir(cfg), id+".state.json")) +} + +func loadDatasetState(cfg *config.Config, id string) (datasetState, error) { + dir := datasetServeDir(cfg) + if id != "" { + return readDatasetStateFile(filepath.Join(dir, id+".state.json")) + } + entries, err := os.ReadDir(dir) + if err != nil { + return datasetState{}, fmt.Errorf("no running dataset (publish one first)") + } + var found []string + for _, e := range entries { + if strings.HasSuffix(e.Name(), ".state.json") { + found = append(found, e.Name()) + } + } + switch len(found) { + case 0: + return datasetState{}, fmt.Errorf("no running dataset (publish one first)") + case 1: + return readDatasetStateFile(filepath.Join(dir, found[0])) + default: + return datasetState{}, fmt.Errorf("multiple datasets running — pass --dataset ") + } +} + +func readDatasetStateFile(path string) (datasetState, error) { + b, err := os.ReadFile(path) + if err != nil { + return datasetState{}, fmt.Errorf("read dataset state: %w", err) + } + var st datasetState + if err := json.Unmarshal(b, &st); err != nil { + return datasetState{}, err + } + return st, nil +} + +func datasetOwnerReq(ctx context.Context, st datasetState, method, path string, body []byte) (*http.Response, error) { + var r io.Reader + if body != nil { + r = strings.NewReader(string(body)) + } + req, _ := http.NewRequestWithContext(ctx, method, st.LocalAddr+path, r) + req.Header.Set("Authorization", "Bearer "+st.OwnerToken) + req.Header.Set("Content-Type", "application/json") + return http.DefaultClient.Do(req) +} + +// splitDatasetURL separates a base URL from an embedded /dataset/ path. +func splitDatasetURL(raw, flagID string) (base, id string) { + raw = strings.TrimRight(raw, "/") + if i := strings.Index(raw, "/dataset/"); i >= 0 { + base = raw[:i] + rest := strings.TrimPrefix(raw[i:], "/dataset/") + id = strings.SplitN(rest, "/", 2)[0] + if flagID != "" { + id = flagID + } + return base, id + } + return raw, flagID +} + +func orHead(v int) any { + if v == 0 { + return "head" + } + return v +} diff --git a/cmd/obol/main.go b/cmd/obol/main.go index a6751acb..63439348 100644 --- a/cmd/obol/main.go +++ b/cmd/obol/main.go @@ -326,6 +326,7 @@ GLOBAL OPTIONS:{{template "visibleFlagTemplate" .}}{{end}} sellCommand(cfg), buyCommand(cfg), researchCommand(cfg), + datasetCommand(cfg), modelCommand(cfg), { Name: "app", diff --git a/docs/guides/monetize-dataset.md b/docs/guides/monetize-dataset.md new file mode 100644 index 00000000..e7d33266 --- /dev/null +++ b/docs/guides/monetize-dataset.md @@ -0,0 +1,121 @@ +# Sell a dataset (and fine-tune on it) + +This guide takes a local dataset from raw bytes to a **versioned, content-addressed, +membership-gated product** that other obol-stacks discover, pay for, download +(verifying every byte), and fine-tune on — with provenance from the model back +to the exact dataset version. + +The dataset is **one artifact, two uses**: the same `sft.jsonl` is your local +fine-tune input *and* the bytes you sell. Nothing is re-exported. + +## 0. Prerequisites + +A dataset *bundle directory* containing a `manifest.json` (a content-address +`hash` + a `files` list) and a `*.jsonl` training artifact: + +``` +my-bundle/ + manifest.json {"hash":"", "files":["sft.jsonl"]} + sft.jsonl chat/instruction records, one JSON object per line +``` + +## 1. Anonymize (before anything leaves the host) + +```bash +SKILLS=${OBOL_SKILLS_DIR:-~/.config/obol/skills} +python3 "$SKILLS/dataset-anonymize/scripts/anonymize.py" \ + my-bundle/sft.jsonl my-bundle/sft.jsonl --report +``` + +The default regex redactor masks emails, IPs, keys, card/SSN-shaped numbers, +home paths, and phones into typed placeholders. For ML-grade detection set +`OBOL_ANONYMIZER_MODEL` to a Hugging Face token-classification PII model. See +the `dataset-anonymize` skill. + +## 2. Record a signed version + +```bash +obol dataset from my-bundle --name pi-sessions +``` + +This reads the bundle, computes the artifact's whole-file SHA-256, and appends +a **signed** `DatasetVersion` (v1) to the dataset's version log — chained to +its predecessor, signed by your owner key (the address buyers pin). Append a new +snapshot later with `obol dataset version pi-sessions --bundle my-bundle-v2`. + +Walk the chain offline at any time: + +```bash +obol dataset verify pi-sessions # rejects any reorder/tamper/middle-removal +``` + +## 3. Publish (host + tunnel + gate) + +```bash +obol dataset publish pi-sessions --membership invite +``` + +Starts the artifact server on your machine and a Cloudflare tunnel. **Bytes +never leave un-gated**: every `/dataset//download` requires a member token +*and* checks the token is entitled to the requested version. The server streams +with HTTP Range (resumable) and commits the whole-file hash on `200` and `206` +alike. + +Two ways a caller holds a member token: + +- **Pre-approved worker** — joins via device-auth; you run + `obol dataset approve `. Gets full (head) access. +- **Anonymous market buyer** — pays the priced offer; the edge x402 verifier + proves the settled payment, and the server mints a token scoped to exactly + the version paid for (`/join/paid`). Payment *is* the approval; the dataset + stays invisible to non-payers. + +Member tokens are persisted by hash, so paying members survive a host restart +without re-paying. + +## 4. Discovery (federated, no central hub) + +A priced dataset is a `type=dataset` `ServiceOffer`: it rides the existing +controller → route → payment-gate → catalog pipeline unchanged, and appears in +the seller's `/api/services.json` with its pinned version metadata +(`datasetManifestHash`, `datasetVersion`, `datasetSizeBytes`). The obol-router +federates that catalog across stacks **type-agnostically** — a dataset is just +another catalog entry — and the on-chain registration is indexed for +discovery. No central hub: each operator owns their dataset; discovery is the +union of everyone's catalogs. + +## 5. Buy — download + verify + +```bash +obol buy dataset https:// --id pi-sessions --version 1 \ + --member-token --out pi-sessions-v1.jsonl +``` + +The client streams the artifact (resuming from a `.part` if interrupted) and +recomputes the whole-file SHA-256, asserting it equals the server's +`X-Dataset-File-Hash`. A mismatch or a missing commitment **fails closed** — no +unverifiable file is ever finalized. + +## 6. Fine-tune (one contract, many backends) + +```bash +python3 "$SKILLS/finetune-backend/scripts/runner.py" \ + --backend unsloth \ + --dataset pi-sessions-v1.jsonl --base-model unsloth/Qwen2.5-0.5B \ + --manifest-hash --out ./run +``` + +Every backend (`mlx-lora`, `unsloth`, `axolotl`, `torchtune`, or `mock` for a +no-GPU contract check) reads the same JSONL. The runner writes +`run.manifest` binding `dataset_hash` to the exact version you bought — the +provenance link from a fine-tuned model back to its data. That is also the +deliverable shape the `finetune@v1` bounty task declares, so a standalone run +and a verified/bounty run stay consistent. + +## Invariants + +- Only the membership-gated route class is ever tunnel-exposed; dataset bytes + never leave the host without a valid, version-scoped member token. +- The version log is signed by the owner key and chained; verification is + offline and detects reorder/tamper. +- The controller never signs or holds a key; settlement is on-chain canonical. diff --git a/flows/hf-surface-smoke.sh b/flows/hf-surface-smoke.sh new file mode 100755 index 00000000..0dcb4e1c --- /dev/null +++ b/flows/hf-surface-smoke.sh @@ -0,0 +1,201 @@ +#!/usr/bin/env bash +# HF-surface smoke — validate the "decentralised Hugging Face" surfaces the +# obol-stack ships, end to end: +# +# 1. Dataset Hub anonymize -> sign a version -> publish (gated) -> buy +# (resumable, whole-file-hash-verified download) +# 2. Inference a type=inference offer in the federated catalog +# 3. Fine-tuning run a backend over the BOUGHT dataset on a real GPU box +# (spark), producing run.manifest bound to the dataset's +# content-address (provenance) +# 4. Discovery federate the seller catalog through obol-router and +# assert both the dataset and inference offers surface +# 5. Indexer cross-check against the obol-exex ERC-8004 indexer +# +# Each surface is independent; a missing prerequisite SKIPs, never aborts. +# +# Overridable env: +# OBOL_BIN path to a built obol (default: build from this tree) +# ROUTER_BIN path to a built obol-router (default: /tmp/obol-router) +# SPARK ssh host for the fine-tune (default: spark1; "" to skip) +# INDEXER_DIR obol-exex-indexer checkout (default: sibling repo) +set -uo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +WORK="$(mktemp -d)" +trap 'jobs -p | xargs -r kill 2>/dev/null; rm -rf "$WORK"' EXIT + +OBOL_BIN="${OBOL_BIN:-$WORK/obol}" +ROUTER_BIN="${ROUTER_BIN:-/tmp/obol-router}" +SPARK="${SPARK-spark1}" +INDEXER_DIR="${INDEXER_DIR:-$ROOT/../obol-exex-indexer}" +SKILLS="$ROOT/internal/embed/skills" + +declare -a RESULTS +pass() { RESULTS+=("PASS $1"); echo " ✓ $1"; } +skip() { RESULTS+=("SKIP $1 — $2"); echo " - SKIP $1 — $2"; } +fail() { RESULTS+=("FAIL $1 — $2"); echo " ✗ FAIL $1 — $2"; } + +section() { echo; echo "=== $1 ==="; } + +# --- build obol if needed --- +if [ ! -x "$OBOL_BIN" ]; then + echo "Building obol …" + ( cd "$ROOT" && go build -o "$OBOL_BIN" ./cmd/obol ) || { echo "obol build failed"; exit 1; } +fi +export OBOL_CONFIG_DIR="$WORK/config" +mkdir -p "$OBOL_CONFIG_DIR" + +DS_ID="pi-sessions" +DS_PORT=18951 +SELLER_PORT=18961 +ROUTER_PORT=18971 + +# Free our ports from any orphan left by a prior aborted run (a subshelled +# server can outlive its parent's job-kill), so federation never reads a +# stale 404. +for p in "$DS_PORT" "$SELLER_PORT" "$ROUTER_PORT"; do + lsof -nP -iTCP:"$p" -sTCP:LISTEN -t 2>/dev/null | xargs -r kill 2>/dev/null +done + +# =========================================================================== +section "Surface 1 — Dataset Hub (anonymize → sign → publish → buy → verify)" +# =========================================================================== +BUNDLE="$WORK/bundle"; mkdir -p "$BUNDLE" +cat > "$BUNDLE/raw.jsonl" <<'EOF' +{"messages":[{"role":"user","content":"email me at alice@example.com from 10.0.0.7"},{"role":"assistant","content":"path /Users/bob/notes.txt, key sk-ABCDEF0123456789abcdef"}]} +{"messages":[{"role":"user","content":"summarize the design doc"},{"role":"assistant","content":"the stack ships a dataset hub, inference, and fine-tuning"}]} +EOF + +if python3 "$SKILLS/dataset-anonymize/scripts/anonymize.py" "$BUNDLE/raw.jsonl" "$BUNDLE/sft.jsonl" >/dev/null 2>&1 \ + && ! grep -q 'alice@example.com' "$BUNDLE/sft.jsonl"; then + pass "1a anonymize — PII masked, no raw leak" +else + fail "1a anonymize" "PII leaked or script error" +fi + +HASH=$(shasum -a 256 "$BUNDLE/sft.jsonl" | awk '{print $1}') +printf '{"hash":"%s","files":["sft.jsonl"]}\n' "$HASH" > "$BUNDLE/manifest.json" + +if "$OBOL_BIN" dataset from "$BUNDLE" --name "$DS_ID" >/dev/null 2>&1 \ + && "$OBOL_BIN" dataset verify "$DS_ID" 2>&1 | grep -q 'Chain valid'; then + pass "1b sign + verify — signed version chain valid" +else + fail "1b sign+verify" "version not recorded or chain invalid" +fi + +MANIFEST_HASH=$(python3 -c "import json;print(json.load(open('$OBOL_CONFIG_DIR/dataset-serve/$DS_ID.store.json'))['versions'][0]['manifestHash'])") + +"$OBOL_BIN" dataset publish "$DS_ID" --membership open --port "$DS_PORT" --no-tunnel >/dev/null 2>&1 & +curl -sf --retry 25 --retry-connrefused --retry-delay 1 "http://127.0.0.1:$DS_PORT/healthz" >/dev/null +OWNER=$(python3 -c "import json;print(json.load(open('$OBOL_CONFIG_DIR/dataset-serve/$DS_ID.state.json'))['owner_token'])" 2>/dev/null) + +if [ -n "$OWNER" ] && "$OBOL_BIN" buy dataset "http://127.0.0.1:$DS_PORT" --id "$DS_ID" --version 1 \ + --member-token "$OWNER" --out "$WORK/bought.jsonl" >/dev/null 2>&1 \ + && diff -q "$WORK/bought.jsonl" "$BUNDLE/sft.jsonl" >/dev/null; then + pass "1c buy — resumable download byte-identical + hash-verified" +else + fail "1c buy" "download/verify mismatch" +fi + +# =========================================================================== +section "Surface 2 — Inference offer (federated catalog entry)" +# =========================================================================== +# Build a seller /api/services.json carrying BOTH a type=dataset entry (the +# real signed version above) and a type=inference entry. +mkdir -p "$WORK/seller/api" +DS_SIZE=$(wc -c < "$BUNDLE/sft.jsonl" | tr -d ' ') +cat > "$WORK/seller/api/services.json" </dev/null 2>&1; then + skip "3 fine-tune" "$SPARK unreachable" +else + RDIR="/tmp/obol-ft-smoke.$$" + ssh "$SPARK" "mkdir -p $RDIR/out" >/dev/null 2>&1 + scp -q "$WORK/bought.jsonl" "$SPARK:$RDIR/ds.jsonl" + scp -q "$SKILLS/finetune-backend/scripts/runner.py" "$SPARK:$RDIR/runner.py" + # mock backend: no framework needed, validates contract + provenance on real hw + if ssh "$SPARK" "cd $RDIR && python3 runner.py --backend mock --dataset ds.jsonl --base-model qwen2.5-0.5b --manifest-hash $MANIFEST_HASH --out out" >/dev/null 2>&1; then + BOUND=$(ssh "$SPARK" "python3 -c \"import json;print(json.load(open('$RDIR/out/run.manifest'))['dataset_hash'])\"" 2>/dev/null) + ARCH=$(ssh "$SPARK" 'uname -m' 2>/dev/null) + if [ "$BOUND" = "$MANIFEST_HASH" ]; then + pass "3 fine-tune on $SPARK ($ARCH) — run.manifest dataset_hash == bought manifestHash" + else + fail "3 fine-tune" "provenance mismatch ($BOUND != $MANIFEST_HASH)" + fi + ssh "$SPARK" "rm -rf $RDIR" >/dev/null 2>&1 + else + skip "3 fine-tune" "runner failed on $SPARK" + fi +fi + +# =========================================================================== +section "Surface 4 — Discovery via obol-router (federation)" +# =========================================================================== +if [ ! -x "$ROUTER_BIN" ]; then + skip "4 router discovery" "obol-router not built at $ROUTER_BIN" +else + ( cd "$WORK/seller" && exec python3 -m http.server "$SELLER_PORT" >/dev/null 2>&1 ) & + curl -sf --retry 15 --retry-connrefused --retry-delay 1 "http://127.0.0.1:$SELLER_PORT/api/services.json" >/dev/null + # The router federates members' /api/services.json and serves the merge at + # GET /api/services.json. PAY_TO/FACILITATOR/BUYER_KEY are required for its + # x402 routing path but unused by this discovery-only check (dummy values). + OBOL_ROUTER_MEMBERS="seller1=http://127.0.0.1:$SELLER_PORT" \ + PORT="$ROUTER_PORT" \ + ROUTER_CHAIN="base-sepolia" \ + ROUTER_PAY_TO="0x1111111111111111111111111111111111111111" \ + ROUTER_FACILITATOR_URL="http://127.0.0.1:1" \ + ROUTER_BUYER_KEY_HEX="ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" \ + "$ROUTER_BIN" >"$WORK/router.log" 2>&1 & + curl -sf --retry 20 --retry-connrefused --retry-delay 1 "http://127.0.0.1:$ROUTER_PORT/healthz" >/dev/null 2>&1 + MERGED=$(curl -sf "http://127.0.0.1:$ROUTER_PORT/api/services.json" 2>/dev/null) + if echo "$MERGED" | python3 -c "import json,sys;d=json.load(sys.stdin);t=[e.get('type') for e in (d if isinstance(d,list) else d.get('services',d.get('offers',[])))];assert 'dataset' in t and 'inference' in t,t" 2>/dev/null; then + pass "4 router federated the dataset AND inference offers" + else + fail "4 router discovery" "merged catalog missing dataset/inference (got: $(echo "$MERGED" | head -c 120))" + fi +fi + +# =========================================================================== +section "Surface 5 — Cross-check via the obol-exex ERC-8004 indexer" +# =========================================================================== +if [ ! -d "$INDEXER_DIR" ]; then + skip "5 indexer" "obol-exex-indexer not found at $INDEXER_DIR" +elif ! command -v cargo >/dev/null 2>&1; then + skip "5 indexer" "cargo not installed" +else + if ( cd "$INDEXER_DIR" && cargo test -p indexer-core --quiet ) >/dev/null 2>&1; then + pass "5 indexer-core tests green (ERC-8004 registration parsing / feeds parity)" + else + fail "5 indexer" "indexer-core tests failed" + fi +fi + +# =========================================================================== +section "Summary" +# =========================================================================== +printf '%s\n' "${RESULTS[@]}" +FAILS=$(printf '%s\n' "${RESULTS[@]}" | grep -c '^FAIL' || true) +echo +if [ "$FAILS" -eq 0 ]; then + echo "HF-surface smoke: no failures ✓" + exit 0 +else + echo "HF-surface smoke: $FAILS failure(s) ✗" + exit 1 +fi diff --git a/internal/dataset/artifacts.go b/internal/dataset/artifacts.go new file mode 100644 index 00000000..adef340a --- /dev/null +++ b/internal/dataset/artifacts.go @@ -0,0 +1,56 @@ +package dataset + +import ( + "fmt" + "io" + "os" + "sync" + "time" +) + +// Artifacts resolves the raw bytes of a dataset version for serving. The +// returned reader is seekable so the HTTP layer (http.ServeContent) can honor +// Range requests; the caller closes it. +type Artifacts interface { + Open(version int) (content io.ReadSeeker, modtime time.Time, closeFn func() error, err error) +} + +// FileArtifacts serves each version from a file on disk. Versions are +// registered as they are published (Set), so a snapshot's bytes stay pinned +// to the path that priced and signed it. +type FileArtifacts struct { + mu sync.RWMutex + paths map[int]string +} + +// NewFileArtifacts returns an empty file-backed artifact store. +func NewFileArtifacts() *FileArtifacts { + return &FileArtifacts{paths: map[int]string{}} +} + +// Set registers the file path serving a version. +func (f *FileArtifacts) Set(version int, path string) { + f.mu.Lock() + defer f.mu.Unlock() + f.paths[version] = path +} + +// Open implements Artifacts. +func (f *FileArtifacts) Open(version int) (io.ReadSeeker, time.Time, func() error, error) { + f.mu.RLock() + path, ok := f.paths[version] + f.mu.RUnlock() + if !ok { + return nil, time.Time{}, nil, fmt.Errorf("dataset: no artifact registered for version %d", version) + } + file, err := os.Open(path) + if err != nil { + return nil, time.Time{}, nil, fmt.Errorf("dataset: open artifact v%d: %w", version, err) + } + info, err := file.Stat() + if err != nil { + file.Close() + return nil, time.Time{}, nil, fmt.Errorf("dataset: stat artifact v%d: %w", version, err) + } + return file, info.ModTime(), file.Close, nil +} diff --git a/internal/dataset/bundle.go b/internal/dataset/bundle.go new file mode 100644 index 00000000..84765380 --- /dev/null +++ b/internal/dataset/bundle.go @@ -0,0 +1,83 @@ +package dataset + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "strings" +) + +// BundleManifest is the minimal shape read from a dataset bundle directory's +// manifest.json: a content-address hash and the list of artifact file names. +// (The bundle is produced by an external dataset exporter; only this +// generic envelope is consumed here.) +type BundleManifest struct { + Hash string `json:"hash"` + Files []string `json:"files"` +} + +// ReadBundle reads /manifest.json, resolves the primary training +// artifact (a .jsonl file, preferring an instruction/sft-style name), and +// returns the manifest hash (the content-address anchor), the artifact's +// absolute path, its whole-file SHA-256, and its byte size. +func ReadBundle(dir string) (manifestHash, artifactPath, fileHash string, size int64, err error) { + data, err := os.ReadFile(filepath.Join(dir, "manifest.json")) + if err != nil { + return "", "", "", 0, fmt.Errorf("dataset: read manifest: %w", err) + } + var m BundleManifest + if err := json.Unmarshal(data, &m); err != nil { + return "", "", "", 0, fmt.Errorf("dataset: parse manifest: %w", err) + } + if len(m.Hash) != 64 { + return "", "", "", 0, fmt.Errorf("dataset: manifest hash must be 64 hex chars, got %d", len(m.Hash)) + } + artifact := pickArtifact(m.Files) + if artifact == "" { + return "", "", "", 0, fmt.Errorf("dataset: no .jsonl artifact listed in manifest") + } + artifactPath = filepath.Join(dir, artifact) + fileHash, size, err = hashFile(artifactPath) + if err != nil { + return "", "", "", 0, err + } + return strings.ToLower(m.Hash), artifactPath, fileHash, size, nil +} + +// pickArtifact chooses the training file: prefer one whose name signals an +// instruction/sft format, else the first .jsonl, else "". +func pickArtifact(files []string) string { + var firstJSONL string + for _, f := range files { + lf := strings.ToLower(f) + if !strings.HasSuffix(lf, ".jsonl") { + continue + } + if firstJSONL == "" { + firstJSONL = f + } + if strings.Contains(lf, "sft") || strings.Contains(lf, "instruct") { + return f + } + } + return firstJSONL +} + +// hashFile returns the lowercase hex SHA-256 and byte size of a file. +func hashFile(path string) (string, int64, error) { + f, err := os.Open(path) + if err != nil { + return "", 0, fmt.Errorf("dataset: open artifact: %w", err) + } + defer f.Close() + h := sha256.New() + n, err := io.Copy(h, f) + if err != nil { + return "", 0, fmt.Errorf("dataset: hash artifact: %w", err) + } + return hex.EncodeToString(h.Sum(nil)), n, nil +} diff --git a/internal/dataset/client.go b/internal/dataset/client.go new file mode 100644 index 00000000..7858b115 --- /dev/null +++ b/internal/dataset/client.go @@ -0,0 +1,131 @@ +package dataset + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "strconv" + "strings" +) + +// FetchResult reports what a verified download produced. +type FetchResult struct { + Version int + ManifestHash string + FileHash string + Bytes int64 + Resumed bool +} + +// FetchOptions configures a verified, resumable dataset download. +type FetchOptions struct { + BaseURL string // e.g. https://host (no trailing slash, no /dataset suffix) + ID string + Version int // 0 = server head + Token string + OutPath string + Client *http.Client +} + +// Fetch downloads a dataset version to OutPath with HTTP Range resume and +// verifies the whole-file SHA-256 against the X-Dataset-File-Hash header the +// server commits on every response. A partial OutPath+".part" from an earlier +// interrupted run is resumed rather than restarted. The verification is done +// once over the reassembled whole file (the hash is of the whole artifact, +// never a chunk). +func Fetch(ctx context.Context, opts FetchOptions) (FetchResult, error) { + if opts.Client == nil { + opts.Client = http.DefaultClient + } + part := opts.OutPath + ".part" + + have := int64(0) + if fi, err := os.Stat(part); err == nil { + have = fi.Size() + } + resumed := have > 0 + + url := strings.TrimSuffix(opts.BaseURL, "/") + "/dataset/" + opts.ID + "/download" + if opts.Version > 0 { + url += "?version=" + strconv.Itoa(opts.Version) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return FetchResult{}, err + } + req.Header.Set("Authorization", "Bearer "+opts.Token) + if have > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", have)) + } + + resp, err := opts.Client.Do(req) + if err != nil { + return FetchResult{}, err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + // Server ignored Range (or fresh start): rewrite from scratch. + have = 0 + resumed = false + case http.StatusPartialContent: + // Append to the existing .part. + default: + body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) + return FetchResult{}, fmt.Errorf("dataset: download %s -> %d: %s", url, resp.StatusCode, strings.TrimSpace(string(body))) + } + + fileHash := strings.ToLower(resp.Header.Get("X-Dataset-File-Hash")) + manifestHash := strings.ToLower(resp.Header.Get("X-Dataset-Manifest-Hash")) + version, _ := strconv.Atoi(resp.Header.Get("X-Dataset-Version")) + if fileHash == "" { + return FetchResult{}, fmt.Errorf("dataset: server did not advertise X-Dataset-File-Hash; refusing unverifiable download") + } + + flag := os.O_CREATE | os.O_WRONLY + if have > 0 { + flag |= os.O_APPEND + } else { + flag |= os.O_TRUNC + } + f, err := os.OpenFile(part, flag, 0o644) + if err != nil { + return FetchResult{}, err + } + if _, err := io.Copy(f, resp.Body); err != nil { + f.Close() + return FetchResult{}, fmt.Errorf("dataset: stream body: %w", err) + } + if err := f.Close(); err != nil { + return FetchResult{}, err + } + + // Verify the reassembled whole file against the committed hash. + got, size, err := hashFile(part) + if err != nil { + return FetchResult{}, err + } + if got != fileHash { + return FetchResult{}, fmt.Errorf("dataset: file hash mismatch: got %s, advertised %s (corrupt or tampered)", got, fileHash) + } + if err := os.Rename(part, opts.OutPath); err != nil { + return FetchResult{}, fmt.Errorf("dataset: finalize download: %w", err) + } + return FetchResult{Version: version, ManifestHash: manifestHash, FileHash: fileHash, Bytes: size, Resumed: resumed}, nil +} + +// VerifyFile recomputes a file's SHA-256 and compares it to want. +func VerifyFile(path, want string) error { + got, _, err := hashFile(path) + if err != nil { + return err + } + if got != strings.ToLower(want) { + return fmt.Errorf("dataset: hash mismatch: got %s, want %s", got, want) + } + return nil +} diff --git a/internal/dataset/client_test.go b/internal/dataset/client_test.go new file mode 100644 index 00000000..9f237a3f --- /dev/null +++ b/internal/dataset/client_test.go @@ -0,0 +1,115 @@ +package dataset + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" +) + +func TestFetch_DownloadsAndVerifies(t *testing.T) { + ts := newTestServer(t, MembershipOpen, nil) + httpSrv := httptest.NewServer(ts.srv.Handler()) + defer httpSrv.Close() + + out := filepath.Join(t.TempDir(), "ds-v1.jsonl") + res, err := Fetch(context.Background(), FetchOptions{ + BaseURL: httpSrv.URL, + ID: "ds", + Version: 1, + Token: ownerToken, // owner is a download superuser + OutPath: out, + }) + if err != nil { + t.Fatalf("Fetch: %v", err) + } + if res.Version != 1 || res.Resumed { + t.Errorf("result = %+v, want version 1, not resumed", res) + } + got, err := os.ReadFile(out) + if err != nil { + t.Fatalf("read out: %v", err) + } + if !bytes.Equal(got, ts.bytesV1) { + t.Error("downloaded file != artifact") + } + if res.FileHash != sha256hex(ts.bytesV1) { + t.Errorf("result hash = %q, want %q", res.FileHash, sha256hex(ts.bytesV1)) + } + if _, err := os.Stat(out + ".part"); !os.IsNotExist(err) { + t.Error(".part file should be removed after a successful finalize") + } +} + +func TestFetch_ResumesFromPartial(t *testing.T) { + ts := newTestServer(t, MembershipOpen, nil) + httpSrv := httptest.NewServer(ts.srv.Handler()) + defer httpSrv.Close() + + out := filepath.Join(t.TempDir(), "ds.jsonl") + // Simulate an interrupted earlier run: the first 10 bytes already on disk. + if err := os.WriteFile(out+".part", ts.bytesV1[:10], 0o644); err != nil { + t.Fatal(err) + } + + res, err := Fetch(context.Background(), FetchOptions{ + BaseURL: httpSrv.URL, ID: "ds", Version: 1, Token: ownerToken, OutPath: out, + }) + if err != nil { + t.Fatalf("Fetch: %v", err) + } + if !res.Resumed { + t.Error("expected Resumed=true from a pre-existing .part") + } + got, _ := os.ReadFile(out) + if !bytes.Equal(got, ts.bytesV1) { + t.Errorf("resumed file = %q, want full artifact", got) + } +} + +func TestFetch_RejectsHashMismatch(t *testing.T) { + // A malicious/buggy server that serves the wrong bytes but advertises the + // real hash must be caught by the whole-file verification. + real := []byte("the-real-bytes\n") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("X-Dataset-File-Hash", sha256hex(real)) + w.Header().Set("X-Dataset-Version", "1") + _, _ = w.Write([]byte("TAMPERED-DIFFERENT-BYTES\n")) + })) + defer srv.Close() + + out := filepath.Join(t.TempDir(), "ds.jsonl") + _, err := Fetch(context.Background(), FetchOptions{BaseURL: srv.URL, ID: "ds", Version: 1, Token: "t", OutPath: out}) + if err == nil { + t.Fatal("Fetch accepted bytes that don't match the advertised hash") + } + if _, statErr := os.Stat(out); !os.IsNotExist(statErr) { + t.Error("a failed verification must not leave a finalized output file") + } +} + +func TestFetch_RefusesUnverifiableDownload(t *testing.T) { + // No X-Dataset-File-Hash -> refuse (don't write an unverifiable file). + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte("anything")) + })) + defer srv.Close() + out := filepath.Join(t.TempDir(), "ds.jsonl") + if _, err := Fetch(context.Background(), FetchOptions{BaseURL: srv.URL, ID: "ds", Token: "t", OutPath: out}); err == nil { + t.Error("Fetch accepted a download with no file-hash commitment") + } +} + +func TestVerifyFile(t *testing.T) { + path := filepath.Join(t.TempDir(), "f") + _ = os.WriteFile(path, []byte("abc"), 0o644) + if err := VerifyFile(path, sha256hex([]byte("abc"))); err != nil { + t.Errorf("VerifyFile good: %v", err) + } + if err := VerifyFile(path, sha256hex([]byte("xyz"))); err == nil { + t.Error("VerifyFile should reject a wrong hash") + } +} diff --git a/internal/dataset/coverage_test.go b/internal/dataset/coverage_test.go new file mode 100644 index 00000000..ae5e4e67 --- /dev/null +++ b/internal/dataset/coverage_test.go @@ -0,0 +1,184 @@ +package dataset + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" +) + +func TestFileArtifacts(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "v1.jsonl") + want := []byte("line1\nline2\n") + if err := os.WriteFile(path, want, 0o644); err != nil { + t.Fatal(err) + } + fa := NewFileArtifacts() + fa.Set(1, path) + + rc, _, closeFn, err := fa.Open(1) + if err != nil { + t.Fatalf("Open(1): %v", err) + } + defer closeFn() + got := make([]byte, len(want)) + if _, err := rc.Read(got); err != nil { + t.Fatalf("read: %v", err) + } + if !bytes.Equal(got, want) { + t.Errorf("read %q, want %q", got, want) + } + if _, _, _, err := fa.Open(2); err == nil { + t.Error("Open(2) should error — no artifact registered") + } + if _, _, _, err := fa.Open(0); err == nil { + t.Error("Open(0) should error") + } +} + +func TestServer_InviteFlow_ApproveThenToken(t *testing.T) { + ts := newTestServer(t, MembershipInvite, nil) + h := ts.srv.Handler() + + // Worker requests a code (NOT auto-approved in invite mode). + cw := do(t, h, "POST", "/auth/device/code", "", nil) + var grant struct { + DeviceCode string `json:"device_code"` + UserCode string `json:"user_code"` + } + _ = json.Unmarshal(cw.Body.Bytes(), &grant) + + // Polling before approval yields authorization_pending (no token). + pw := postJSON(t, h, "/auth/device/token", `{"device_code":"`+grant.DeviceCode+`"}`, "") + if pw.Code != http.StatusOK { + t.Fatalf("pre-approval poll = %d", pw.Code) + } + + // Non-owner cannot approve. + if w := postJSON(t, h, "/auth/device/approve", `{"user_code":"`+grant.UserCode+`"}`, "not-owner"); w.Code != http.StatusUnauthorized { + t.Errorf("non-owner approve = %d, want 401", w.Code) + } + // Owner approves. + if w := postJSON(t, h, "/auth/device/approve", `{"user_code":"`+grant.UserCode+`"}`, ownerToken); w.Code != http.StatusOK { + t.Fatalf("owner approve = %d, body %s", w.Code, w.Body.String()) + } + // Bad approve body. + if w := postJSON(t, h, "/auth/device/approve", `{}`, ownerToken); w.Code != http.StatusBadRequest { + t.Errorf("empty approve = %d, want 400", w.Code) + } + + // Now polling mints a token. + tw := postJSON(t, h, "/auth/device/token", `{"device_code":"`+grant.DeviceCode+`"}`, "") + var tok struct { + Token string `json:"token"` + } + _ = json.Unmarshal(tw.Body.Bytes(), &tok) + if tok.Token == "" { + t.Fatal("no token after approval") + } + + // That member token can read the member-gated versions list. + vw := do(t, h, "GET", "/dataset/ds/versions", tok.Token, nil) + if vw.Code != http.StatusOK { + t.Errorf("member versions = %d, want 200", vw.Code) + } + // Bad device token request. + if w := postJSON(t, h, "/auth/device/token", `{}`, ""); w.Code != http.StatusBadRequest { + t.Errorf("empty device token = %d, want 400", w.Code) + } +} + +func TestServer_ErrorPaths(t *testing.T) { + t.Run("join paid disabled when nil", func(t *testing.T) { + ts := newTestServer(t, MembershipInvite, nil) + w := do(t, ts.srv.Handler(), "POST", "/dataset/ds/join/paid", "", nil) + if w.Code != http.StatusServiceUnavailable { + t.Errorf("= %d, want 503", w.Code) + } + }) + t.Run("join paid rejects payment error", func(t *testing.T) { + ts := newTestServer(t, MembershipInvite, fakePayments{err: errFake}) + w := do(t, ts.srv.Handler(), "POST", "/dataset/ds/join/paid", "", nil) + if w.Code != http.StatusPaymentRequired { + t.Errorf("= %d, want 402", w.Code) + } + }) + t.Run("join paid unknown version", func(t *testing.T) { + ts := newTestServer(t, MembershipInvite, fakePayments{version: 99}) + w := do(t, ts.srv.Handler(), "POST", "/dataset/ds/join/paid", "", nil) + if w.Code != http.StatusBadRequest { + t.Errorf("= %d, want 400", w.Code) + } + }) + t.Run("unknown dataset id 404", func(t *testing.T) { + ts := newTestServer(t, MembershipOpen, nil) + w := do(t, ts.srv.Handler(), "GET", "/dataset/other/versions", ownerToken, nil) + if w.Code != http.StatusNotFound { + t.Errorf("= %d, want 404", w.Code) + } + }) + t.Run("download bad version 404", func(t *testing.T) { + ts := newTestServer(t, MembershipOpen, nil) + h := ts.srv.Handler() + for _, q := range []string{"abc", "0", "99"} { + w := do(t, h, "GET", "/dataset/ds/download?version="+q, ownerToken, nil) + if w.Code != http.StatusNotFound { + t.Errorf("version=%q = %d, want 404", q, w.Code) + } + } + }) + t.Run("download artifact missing", func(t *testing.T) { + // Log has v1 but the artifact source has nothing. + ts := newTestServer(t, MembershipOpen, nil) + ts.srv.artifacts = memArtifacts{data: map[int][]byte{}} + w := do(t, ts.srv.Handler(), "GET", "/dataset/ds/download?version=1", ownerToken, nil) + if w.Code != http.StatusNotFound { + t.Errorf("= %d, want 404", w.Code) + } + }) +} + +func TestRecoverSigner_BadInputs(t *testing.T) { + v := EthVerifier{} + var d [32]byte + if _, err := v.RecoverSigner(d, "zz"); err == nil { + t.Error("non-hex sig accepted") + } + if _, err := v.RecoverSigner(d, "abcd"); err == nil { + t.Error("wrong-length sig accepted") + } +} + +func TestLog_LenAndStorePath(t *testing.T) { + l := NewLog() + if l.Len() != 0 { + t.Errorf("empty Len = %d", l.Len()) + } + s := NewStore("/tmp/x.json") + if s.Path() != "/tmp/x.json" { + t.Errorf("Path = %q", s.Path()) + } +} + +// helpers + +var errFake = &fakeErr{} + +type fakeErr struct{} + +func (*fakeErr) Error() string { return "fake payment failure" } + +func postJSON(t *testing.T, h http.Handler, target, body, token string) *httptest.ResponseRecorder { + t.Helper() + r := httptest.NewRequest("POST", target, bytes.NewReader([]byte(body))) + if token != "" { + r.Header.Set("Authorization", "Bearer "+token) + } + w := httptest.NewRecorder() + h.ServeHTTP(w, r) + return w +} diff --git a/internal/dataset/entitlement.go b/internal/dataset/entitlement.go new file mode 100644 index 00000000..d3934eac --- /dev/null +++ b/internal/dataset/entitlement.go @@ -0,0 +1,77 @@ +package dataset + +import "sync" + +// Entitlement records what a single member token may download. A +// payment-minted token is scoped to the exact version it paid for; an +// owner-admitted worker gets MaxVersion = head (full access). The raw token +// is never stored — only its SHA-256 hash (the same hash groupauth keys on), +// so the entitlement map can be persisted without holding a credential. +type Entitlement struct { + TokenHash string `json:"tokenHash"` + GroupID string `json:"groupID"` + MaxVersion int `json:"maxVersion"` + PaidAtomic string `json:"paidAtomic,omitempty"` + Label string `json:"label,omitempty"` +} + +// Entitlements is the concurrent token-hash -> Entitlement map that enforces +// the version-scope invariant the bare groupauth member() gate cannot express +// (member() only proves group membership, not which version was paid for). +type Entitlements struct { + mu sync.Mutex + byHash map[string]Entitlement +} + +// NewEntitlements returns an empty map. +func NewEntitlements() *Entitlements { + return &Entitlements{byHash: map[string]Entitlement{}} +} + +// Grant records (or overwrites) an entitlement. +func (e *Entitlements) Grant(ent Entitlement) { + e.mu.Lock() + defer e.mu.Unlock() + e.byHash[ent.TokenHash] = ent +} + +// Lookup returns the entitlement for a token hash. +func (e *Entitlements) Lookup(tokenHash string) (Entitlement, bool) { + e.mu.Lock() + defer e.mu.Unlock() + ent, ok := e.byHash[tokenHash] + return ent, ok +} + +// Allows reports whether the token hash may download the given version +// (1 <= version <= MaxVersion). An unknown token is never allowed. +func (e *Entitlements) Allows(tokenHash string, version int) bool { + e.mu.Lock() + defer e.mu.Unlock() + ent, ok := e.byHash[tokenHash] + if !ok { + return false + } + return version >= 1 && version <= ent.MaxVersion +} + +// All returns every entitlement (for persistence), in unspecified order. +func (e *Entitlements) All() []Entitlement { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]Entitlement, 0, len(e.byHash)) + for _, v := range e.byHash { + out = append(out, v) + } + return out +} + +// Load replaces the map from persisted entitlements (rehydration after restart). +func (e *Entitlements) Load(ents []Entitlement) { + e.mu.Lock() + defer e.mu.Unlock() + e.byHash = make(map[string]Entitlement, len(ents)) + for _, ent := range ents { + e.byHash[ent.TokenHash] = ent + } +} diff --git a/internal/dataset/entitlement_test.go b/internal/dataset/entitlement_test.go new file mode 100644 index 00000000..02066b01 --- /dev/null +++ b/internal/dataset/entitlement_test.go @@ -0,0 +1,62 @@ +package dataset + +import ( + "reflect" + "sort" + "testing" +) + +func TestEntitlements_Allows(t *testing.T) { + e := NewEntitlements() + e.Grant(Entitlement{TokenHash: "v2tok", GroupID: "g", MaxVersion: 2}) + e.Grant(Entitlement{TokenHash: "headtok", GroupID: "g", MaxVersion: 5}) + + tests := []struct { + name string + tokenHash string + version int + want bool + }{ + {"paid v2 can fetch v1", "v2tok", 1, true}, + {"paid v2 can fetch v2", "v2tok", 2, true}, + {"paid v2 CANNOT fetch v3", "v2tok", 3, false}, + {"worker head can fetch v5", "headtok", 5, true}, + {"unknown token denied", "ghost", 1, false}, + {"version zero denied", "headtok", 0, false}, + {"negative version denied", "headtok", -1, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := e.Allows(tt.tokenHash, tt.version); got != tt.want { + t.Errorf("Allows(%q, %d) = %v, want %v", tt.tokenHash, tt.version, got, tt.want) + } + }) + } +} + +func TestEntitlements_LoadAllRoundTrip(t *testing.T) { + e := NewEntitlements() + in := []Entitlement{ + {TokenHash: "a", GroupID: "g", MaxVersion: 1, PaidAtomic: "1000"}, + {TokenHash: "b", GroupID: "g", MaxVersion: 3}, + } + e.Load(in) + + if ent, ok := e.Lookup("a"); !ok || ent.PaidAtomic != "1000" { + t.Errorf("Lookup(a) = %+v, %v", ent, ok) + } + got := e.All() + sort.Slice(got, func(i, j int) bool { return got[i].TokenHash < got[j].TokenHash }) + if !reflect.DeepEqual(got, in) { + t.Errorf("All() = %+v, want %+v", got, in) + } +} + +func TestEntitlements_GrantOverwrites(t *testing.T) { + e := NewEntitlements() + e.Grant(Entitlement{TokenHash: "a", MaxVersion: 1}) + e.Grant(Entitlement{TokenHash: "a", MaxVersion: 9}) // top-up to a newer version + if !e.Allows("a", 9) { + t.Error("Grant did not overwrite MaxVersion") + } +} diff --git a/internal/dataset/helpers_test.go b/internal/dataset/helpers_test.go new file mode 100644 index 00000000..bf197e79 --- /dev/null +++ b/internal/dataset/helpers_test.go @@ -0,0 +1,89 @@ +package dataset + +import ( + "os" + "path/filepath" + "testing" + + ethcrypto "github.com/ethereum/go-ethereum/crypto" +) + +func TestReadBundle(t *testing.T) { + dir := t.TempDir() + artifact := []byte(`{"messages":[]}` + "\n") + if err := os.WriteFile(filepath.Join(dir, "sft.jsonl"), artifact, 0o644); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(dir, "manifest.json"), + []byte(`{"hash":"`+hashA+`","files":["chatml.jsonl","sft.jsonl"]}`), 0o644); err != nil { + t.Fatal(err) + } + + mh, path, fh, size, err := ReadBundle(dir) + if err != nil { + t.Fatalf("ReadBundle: %v", err) + } + if mh != hashA { + t.Errorf("manifestHash = %q, want %q", mh, hashA) + } + if filepath.Base(path) != "sft.jsonl" { + t.Errorf("picked %q, want sft.jsonl (instruction-format preference)", filepath.Base(path)) + } + if fh != sha256hex(artifact) { + t.Errorf("fileHash = %q, want %q", fh, sha256hex(artifact)) + } + if size != int64(len(artifact)) { + t.Errorf("size = %d, want %d", size, len(artifact)) + } +} + +func TestReadBundle_Errors(t *testing.T) { + t.Run("missing manifest", func(t *testing.T) { + if _, _, _, _, err := ReadBundle(t.TempDir()); err == nil { + t.Error("missing manifest should error") + } + }) + t.Run("bad hash length", func(t *testing.T) { + dir := t.TempDir() + _ = os.WriteFile(filepath.Join(dir, "manifest.json"), []byte(`{"hash":"abcd","files":["a.jsonl"]}`), 0o644) + if _, _, _, _, err := ReadBundle(dir); err == nil { + t.Error("short hash should error") + } + }) + t.Run("no jsonl artifact", func(t *testing.T) { + dir := t.TempDir() + _ = os.WriteFile(filepath.Join(dir, "manifest.json"), []byte(`{"hash":"`+hashA+`","files":["readme.txt"]}`), 0o644) + if _, _, _, _, err := ReadBundle(dir); err == nil { + t.Error("no jsonl should error") + } + }) +} + +func TestLoadOrCreateKey(t *testing.T) { + path := filepath.Join(t.TempDir(), "keys", "ds.key") + + k1, err := LoadOrCreateKey(path) + if err != nil { + t.Fatalf("create: %v", err) + } + if _, err := os.Stat(path); err != nil { + t.Fatalf("key not persisted: %v", err) + } + // Second load returns the same key (stable owner identity). + k2, err := LoadOrCreateKey(path) + if err != nil { + t.Fatalf("reload: %v", err) + } + a1 := ethcrypto.PubkeyToAddress(k1.PublicKey) + a2 := ethcrypto.PubkeyToAddress(k2.PublicKey) + if a1 != a2 { + t.Errorf("reloaded key changed address: %s != %s", a1.Hex(), a2.Hex()) + } + + // Corrupt file -> error. + bad := filepath.Join(t.TempDir(), "bad.key") + _ = os.WriteFile(bad, []byte("not-a-key"), 0o600) + if _, err := LoadOrCreateKey(bad); err == nil { + t.Error("corrupt key file should error") + } +} diff --git a/internal/dataset/keyfile.go b/internal/dataset/keyfile.go new file mode 100644 index 00000000..19fadf66 --- /dev/null +++ b/internal/dataset/keyfile.go @@ -0,0 +1,43 @@ +package dataset + +import ( + "crypto/ecdsa" + "encoding/hex" + "fmt" + "os" + "path/filepath" + "strings" + + ethcrypto "github.com/ethereum/go-ethereum/crypto" +) + +// LoadOrCreateKey loads a hex-encoded secp256k1 private key from path, or +// generates one and persists it (0600) on first use. This is the owner's +// dataset-signing key: it signs the version log and its address is the owner +// identity buyers pin via Verify. Operators who already control an on-chain +// registration key can point path at that key's hex to unify the identity. +func LoadOrCreateKey(path string) (*ecdsa.PrivateKey, error) { + data, err := os.ReadFile(path) + switch { + case err == nil: + key, perr := ethcrypto.HexToECDSA(strings.TrimSpace(string(data))) + if perr != nil { + return nil, fmt.Errorf("dataset: parse signing key %s: %w", path, perr) + } + return key, nil + case !os.IsNotExist(err): + return nil, fmt.Errorf("dataset: read signing key %s: %w", path, err) + } + + key, err := ethcrypto.GenerateKey() + if err != nil { + return nil, err + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return nil, err + } + if err := os.WriteFile(path, []byte(hex.EncodeToString(ethcrypto.FromECDSA(key))), 0o600); err != nil { + return nil, fmt.Errorf("dataset: persist signing key: %w", err) + } + return key, nil +} diff --git a/internal/dataset/server.go b/internal/dataset/server.go new file mode 100644 index 00000000..8fb1325c --- /dev/null +++ b/internal/dataset/server.go @@ -0,0 +1,447 @@ +package dataset + +import ( + "crypto/subtle" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "strconv" + "strings" + + "github.com/ObolNetwork/obol-stack/internal/research/groupauth" +) + +// Membership modes (mirror internal/research/server). +const ( + MembershipOpen = "open" + MembershipInvite = "invite" +) + +// PaymentValidator validates a forwarded proof-of-payment for the paid-join +// path. It runs ONLY behind the edge x402-verifier ForwardAuth (which has +// already proven a settled payment); its job is to confirm the payment binds +// to THIS dataset offer and to extract which version + atomic amount was paid. +// It must never be exposed as a raw public route. +type PaymentValidator interface { + Validate(r *http.Request, offerID string) (version int, atomic string, err error) +} + +// Config builds a Server. Log/Ents/Store/Artifacts are owned by the caller so +// the CLI can rehydrate them from disk before serving. +type Config struct { + ID string // dataset id; appears in the route and as the default group id + GroupID string // membership group id (defaults to ID) + Membership string // open | invite (default invite) + OwnerToken string // gates owner-only routes; also a download superuser + OwnerSigner string // 0x address that must have signed the version log (verify pins it) + Log *Log + Ents *Entitlements + Store *Store + Artifacts Artifacts + Payments PaymentValidator + Logger *slog.Logger +} + +// Server hosts one versioned dataset over an owner-run, membership-gated HTTP +// surface. Bytes never leave the owner machine un-gated. +type Server struct { + id string + groupID string + membership string + owner string + ownerSig string + auth *groupauth.Authority + log *Log + ents *Entitlements + store *Store + artifacts Artifacts + payments PaymentValidator + logger *slog.Logger +} + +// NewServer builds a Server from cfg, rehydrating the in-memory groupauth +// Authority from any persisted entitlements so paying members survive a +// restart. +func NewServer(cfg Config) *Server { + if cfg.Logger == nil { + cfg.Logger = slog.Default() + } + if cfg.Membership == "" { + cfg.Membership = MembershipInvite + } + if cfg.GroupID == "" { + cfg.GroupID = cfg.ID + } + if cfg.Log == nil { + cfg.Log = NewLog() + } + if cfg.Ents == nil { + cfg.Ents = NewEntitlements() + } + s := &Server{ + id: cfg.ID, + groupID: cfg.GroupID, + membership: cfg.Membership, + owner: cfg.OwnerToken, + ownerSig: strings.ToLower(strings.TrimSpace(cfg.OwnerSigner)), + auth: groupauth.New(), + log: cfg.Log, + ents: cfg.Ents, + store: cfg.Store, + artifacts: cfg.Artifacts, + payments: cfg.Payments, + logger: cfg.Logger, + } + // Rehydrate groupauth from persisted entitlements (verified by hash; the + // raw token is never needed). + for _, ent := range s.ents.All() { + s.auth.RegisterHash(ent.TokenHash, ent.GroupID, ent.Label) + } + return s +} + +// Handler returns the HTTP mux. +func (s *Server) Handler() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) + + // Device-auth (public — the device_code is the secret, RFC 8628). + mux.HandleFunc("POST /auth/device/code", s.handleDeviceCode) + mux.HandleFunc("POST /auth/device/token", s.handleDeviceToken) + mux.HandleFunc("POST /auth/device/approve", s.ownerOnly(s.handleApprove)) + + // Paid join — payment mints a version-scoped member token. Behind the + // edge x402-verifier ForwardAuth, never a raw public route. + mux.HandleFunc("POST /dataset/{id}/join/paid", s.handleJoinPaid) + + // Member-gated reads. + mux.HandleFunc("GET /dataset/{id}/versions", s.member(s.handleVersions)) + mux.HandleFunc("GET /dataset/{id}/verify", s.member(s.handleVerify)) + + // Entitlement-gated (member + version) download with Range support. + mux.HandleFunc("GET /dataset/{id}/download", s.downloadGate(s.handleDownload)) + + // Owner-only operational view. + mux.HandleFunc("GET /dataset/{id}/status", s.ownerOnly(s.handleStatus)) + return mux +} + +// --- device-auth (mirrors internal/research/server) --- + +func (s *Server) handleDeviceCode(w http.ResponseWriter, r *http.Request) { + var body struct { + Worker string `json:"worker"` + } + _ = json.NewDecoder(r.Body).Decode(&body) + grant, err := s.auth.RequestCode(body.Worker) + if err != nil { + writeErr(w, http.StatusInternalServerError, "server_error", "failed to create code") + return + } + if s.membership == MembershipOpen { + _ = s.auth.Approve(s.groupID, grant.UserCode) + } + writeJSON(w, http.StatusOK, grant) +} + +func (s *Server) handleDeviceToken(w http.ResponseWriter, r *http.Request) { + var body struct { + DeviceCode string `json:"device_code"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.DeviceCode == "" { + writeErr(w, http.StatusBadRequest, "invalid_request", "device_code required") + return + } + res, err := s.auth.Poll(body.DeviceCode) + switch err { + case nil: + // A freshly-issued device-auth token is an owner-admitted worker: + // grant full access (MaxVersion = head) and persist it. + if res.Token != "" { + head := 0 + if h, ok := s.log.Head(); ok { + head = h.Seq + } + s.ents.Grant(Entitlement{ + TokenHash: groupauth.HashToken(res.Token), + GroupID: s.groupID, + MaxVersion: head, + Label: "worker", + }) + s.persist() + } + writeJSON(w, http.StatusOK, res) + case groupauth.ErrExpired: + writeErr(w, http.StatusGone, "expired_token", "device code expired") + default: + writeErr(w, http.StatusNotFound, "invalid_grant", "device code not found") + } +} + +func (s *Server) handleApprove(w http.ResponseWriter, r *http.Request) { + var body struct { + UserCode string `json:"user_code"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.UserCode == "" { + writeErr(w, http.StatusBadRequest, "invalid_request", "user_code required") + return + } + switch err := s.auth.Approve(s.groupID, body.UserCode); err { + case nil: + writeJSON(w, http.StatusOK, map[string]string{"status": "approved"}) + case groupauth.ErrExpired: + writeErr(w, http.StatusGone, "expired_code", "code expired") + case groupauth.ErrAlreadyUsed: + writeErr(w, http.StatusConflict, "already_used", "code already used") + default: + writeErr(w, http.StatusNotFound, "invalid_code", "code not found") + } +} + +// --- paid join --- + +func (s *Server) handleJoinPaid(w http.ResponseWriter, r *http.Request) { + if !s.matchID(r) { + writeErr(w, http.StatusNotFound, "unknown_dataset", "no such dataset on this host") + return + } + if s.payments == nil { + writeErr(w, http.StatusServiceUnavailable, "paid_join_disabled", "paid join not configured") + return + } + version, atomic, err := s.payments.Validate(r, s.id) + if err != nil { + writeErr(w, http.StatusPaymentRequired, "payment_required", err.Error()) + return + } + if _, ok := s.log.Get(version); !ok { + writeErr(w, http.StatusBadRequest, "unknown_version", fmt.Sprintf("version %d not published", version)) + return + } + raw, hash, err := s.auth.Mint(s.groupID, "paid-v"+strconv.Itoa(version)) + if err != nil { + writeErr(w, http.StatusInternalServerError, "server_error", "failed to mint token") + return + } + s.ents.Grant(Entitlement{ + TokenHash: hash, + GroupID: s.groupID, + MaxVersion: version, + PaidAtomic: atomic, + Label: "paid", + }) + s.persist() + s.logger.Info("paid join", "dataset", s.id, "version", version, "atomic", atomic) + writeJSON(w, http.StatusOK, map[string]any{"token": raw, "version": version}) +} + +// --- member-gated reads --- + +func (s *Server) handleVersions(w http.ResponseWriter, r *http.Request) { + if !s.matchID(r) { + writeErr(w, http.StatusNotFound, "unknown_dataset", "no such dataset on this host") + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "id": s.id, + "versions": s.log.Versions(), + }) +} + +func (s *Server) handleVerify(w http.ResponseWriter, r *http.Request) { + if !s.matchID(r) { + writeErr(w, http.StatusNotFound, "unknown_dataset", "no such dataset on this host") + return + } + err := s.log.Verify(EthVerifier{}, s.ownerSig) + head := 0 + if h, ok := s.log.Head(); ok { + head = h.Seq + } + resp := map[string]any{"valid": err == nil, "head": head, "owner": s.ownerSig} + if err != nil { + resp["error"] = err.Error() + } + writeJSON(w, http.StatusOK, resp) +} + +// --- download --- + +func (s *Server) handleDownload(w http.ResponseWriter, r *http.Request, version int) { + v, ok := s.log.Get(version) + if !ok { + writeErr(w, http.StatusNotFound, "unknown_version", fmt.Sprintf("version %d not published", version)) + return + } + if s.artifacts == nil { + writeErr(w, http.StatusServiceUnavailable, "no_artifacts", "artifact source not configured") + return + } + content, modtime, closeFn, err := s.artifacts.Open(version) + if err != nil { + writeErr(w, http.StatusNotFound, "artifact_missing", err.Error()) + return + } + defer closeFn() + + // Whole-artifact commitments: sent on 200 AND 206 alike, so a resumed or + // multi-connection download verifies against the full-file hash after + // reassembly (the hash is of the whole file, never a chunk). + w.Header().Set("X-Dataset-Version", strconv.Itoa(v.Seq)) + w.Header().Set("X-Dataset-Manifest-Hash", v.ManifestHash) + w.Header().Set("X-Dataset-File-Hash", v.FileHash) + w.Header().Set("Content-Type", "application/x-ndjson") + + // http.ServeContent handles Accept-Ranges, Range -> 206, Content-Range, + // and conditional requests for us. + http.ServeContent(w, r, fmt.Sprintf("%s-v%d.jsonl", s.id, v.Seq), modtime, content) +} + +// downloadGate enforces group membership AND version entitlement before +// handing the resolved version to next. The owner token is a superuser. +func (s *Server) downloadGate(next func(http.ResponseWriter, *http.Request, int)) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !s.matchID(r) { + writeErr(w, http.StatusNotFound, "unknown_dataset", "no such dataset on this host") + return + } + tok := bearer(r) + if tok == "" { + writeErr(w, http.StatusUnauthorized, "auth_required", "member token required") + return + } + version, ok := s.resolveVersion(r) + if !ok { + writeErr(w, http.StatusNotFound, "unknown_version", "requested version is not published") + return + } + if s.isOwner(tok) { + next(w, r, version) + return + } + gid, ok := s.auth.VerifyToken(tok) + if !ok || gid != s.groupID { + writeErr(w, http.StatusForbidden, "not_a_member", "token is not a member of this dataset") + return + } + if !s.ents.Allows(groupauth.HashToken(tok), version) { + writeErr(w, http.StatusForbidden, "version_not_entitled", + fmt.Sprintf("token is not entitled to version %d", version)) + return + } + next(w, r, version) + } +} + +// resolveVersion returns the requested ?version=N (default: head). ok==false +// when the log is empty or N is out of range. +func (s *Server) resolveVersion(r *http.Request) (int, bool) { + head, ok := s.log.Head() + if !ok { + return 0, false + } + q := strings.TrimSpace(r.URL.Query().Get("version")) + if q == "" { + return head.Seq, true + } + n, err := strconv.Atoi(q) + if err != nil || n < 1 || n > head.Seq { + return 0, false + } + return n, true +} + +// --- owner view --- + +func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { + if !s.matchID(r) { + writeErr(w, http.StatusNotFound, "unknown_dataset", "no such dataset on this host") + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "id": s.id, + "groupID": s.groupID, + "membership": s.membership, + "versions": s.log.Versions(), + "entitlements": len(s.ents.All()), + }) +} + +// --- middleware + helpers --- + +func (s *Server) member(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + tok := bearer(r) + if tok == "" { + writeErr(w, http.StatusUnauthorized, "auth_required", "member token required") + return + } + if s.isOwner(tok) { + next(w, r) + return + } + gid, ok := s.auth.VerifyToken(tok) + if !ok || gid != s.groupID { + writeErr(w, http.StatusForbidden, "not_a_member", "token is not a member of this dataset") + return + } + next(w, r) + } +} + +func (s *Server) ownerOnly(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if !s.isOwner(bearer(r)) { + writeErr(w, http.StatusUnauthorized, "owner_required", "owner token required") + return + } + next(w, r) + } +} + +func (s *Server) isOwner(tok string) bool { + return s.owner != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(s.owner)) == 1 +} + +// matchID returns true when the {id} path value addresses this dataset. +func (s *Server) matchID(r *http.Request) bool { + id := r.PathValue("id") + return id == "" || id == s.id +} + +// persist snapshots the log + entitlements to the backing store (best-effort; +// a persistence failure is logged but does not fail the request — the +// in-memory state is authoritative for the live process). +func (s *Server) persist() { + if s.store == nil { + return + } + st := State{ + ID: s.id, + GroupID: s.groupID, + Versions: s.log.Versions(), + Entitlements: s.ents.All(), + } + if err := s.store.Save(st); err != nil { + s.logger.Error("dataset persist failed", "dataset", s.id, "err", err) + } +} + +func bearer(r *http.Request) string { + h := r.Header.Get("Authorization") + if v, ok := strings.CutPrefix(h, "Bearer "); ok { + return strings.TrimSpace(v) + } + return "" +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func writeErr(w http.ResponseWriter, code int, kind, msg string) { + writeJSON(w, code, map[string]string{"error": kind, "message": msg}) +} diff --git a/internal/dataset/server_test.go b/internal/dataset/server_test.go new file mode 100644 index 00000000..9e898a68 --- /dev/null +++ b/internal/dataset/server_test.go @@ -0,0 +1,284 @@ +package dataset + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + "time" +) + +const ownerToken = "owner-secret-token" + +// fakePayments stands in for the edge x402-verifier's forwarded proof. +type fakePayments struct { + version int + atomic string + err error +} + +func (f fakePayments) Validate(_ *http.Request, _ string) (int, string, error) { + return f.version, f.atomic, f.err +} + +// memArtifacts serves version bytes from memory (seekable for Range). +type memArtifacts struct{ data map[int][]byte } + +func (m memArtifacts) Open(version int) (io.ReadSeeker, time.Time, func() error, error) { + b, ok := m.data[version] + if !ok { + return nil, time.Time{}, nil, fmt.Errorf("no artifact v%d", version) + } + return bytes.NewReader(b), fixedTime, func() error { return nil }, nil +} + +func sha256hex(b []byte) string { + h := sha256.Sum256(b) + return hex.EncodeToString(h[:]) +} + +type testServer struct { + srv *Server + bytesV1 []byte + signer *EthSigner + store *Store +} + +func newTestServer(t *testing.T, membership string, payments PaymentValidator) testServer { + t.Helper() + signer := newTestSigner(t) + artifact := []byte(`{"messages":[{"role":"user","content":"hi"}]}` + "\n") + fileHash := sha256hex(artifact) + + log := NewLog() + if _, err := log.Append(hashA, fileHash, int64(len(artifact)), signer, fixedTime); err != nil { + t.Fatalf("append v1: %v", err) + } + store := NewStore(filepath.Join(t.TempDir(), "ds.json")) + srv := NewServer(Config{ + ID: "ds", + Membership: membership, + OwnerToken: ownerToken, + OwnerSigner: signer.SignerID(), + Log: log, + Ents: NewEntitlements(), + Store: store, + Artifacts: memArtifacts{data: map[int][]byte{1: artifact}}, + Payments: payments, + }) + return testServer{srv: srv, bytesV1: artifact, signer: signer, store: store} +} + +func do(t *testing.T, h http.Handler, method, target, token string, hdr map[string]string) *httptest.ResponseRecorder { + t.Helper() + r := httptest.NewRequest(method, target, nil) + if token != "" { + r.Header.Set("Authorization", "Bearer "+token) + } + for k, v := range hdr { + r.Header.Set(k, v) + } + w := httptest.NewRecorder() + h.ServeHTTP(w, r) + return w +} + +func TestServer_PaidJoinThenDownload(t *testing.T) { + ts := newTestServer(t, MembershipInvite, fakePayments{version: 1, atomic: "1000"}) + h := ts.srv.Handler() + + // Pay -> mint a version-1 token. + w := do(t, h, "POST", "/dataset/ds/join/paid", "", nil) + if w.Code != http.StatusOK { + t.Fatalf("join/paid = %d, body %s", w.Code, w.Body.String()) + } + var join struct { + Token string `json:"token"` + Version int `json:"version"` + } + if err := json.Unmarshal(w.Body.Bytes(), &join); err != nil { + t.Fatalf("join body: %v", err) + } + if join.Token == "" || join.Version != 1 { + t.Fatalf("join = %+v", join) + } + + // Download v1 with the minted token; verify whole-file hash matches. + dw := do(t, h, "GET", "/dataset/ds/download?version=1", join.Token, nil) + if dw.Code != http.StatusOK { + t.Fatalf("download = %d, body %s", dw.Code, dw.Body.String()) + } + if !bytes.Equal(dw.Body.Bytes(), ts.bytesV1) { + t.Error("downloaded bytes != artifact") + } + if got := dw.Header().Get("X-Dataset-File-Hash"); got != sha256hex(ts.bytesV1) { + t.Errorf("X-Dataset-File-Hash = %q, want %q", got, sha256hex(ts.bytesV1)) + } + if sha256hex(dw.Body.Bytes()) != dw.Header().Get("X-Dataset-File-Hash") { + t.Error("recomputed body hash != advertised file hash") + } + if dw.Header().Get("Accept-Ranges") != "bytes" { + t.Error("download did not advertise Range support") + } +} + +func TestServer_VersionScopeEnforced(t *testing.T) { + ts := newTestServer(t, MembershipInvite, fakePayments{version: 1, atomic: "1000"}) + h := ts.srv.Handler() + + // Append a v2 to the log so ?version=2 is a real (but unpaid) version. + if _, err := ts.srv.log.Append(hashB, hashB, 5, ts.signer, fixedTime); err != nil { + t.Fatalf("append v2: %v", err) + } + + w := do(t, h, "POST", "/dataset/ds/join/paid", "", nil) // pays for v1 + var join struct{ Token string } + _ = json.Unmarshal(w.Body.Bytes(), &join) + + // v1 token may fetch v1... + if dw := do(t, h, "GET", "/dataset/ds/download?version=1", join.Token, nil); dw.Code != http.StatusOK { + t.Errorf("v1 token download v1 = %d, want 200", dw.Code) + } + // ...but is forbidden from v2. + dw := do(t, h, "GET", "/dataset/ds/download?version=2", join.Token, nil) + if dw.Code != http.StatusForbidden { + t.Fatalf("v1 token download v2 = %d, want 403", dw.Code) + } + if got := errorKind(t, dw.Body.Bytes()); got != "version_not_entitled" { + t.Errorf("error = %q, want version_not_entitled", got) + } +} + +func TestServer_RangeReturns206WithWholeFileHash(t *testing.T) { + ts := newTestServer(t, MembershipOpen, nil) + h := ts.srv.Handler() + token := ownerToken // owner is a download superuser + + dw := do(t, h, "GET", "/dataset/ds/download?version=1", token, map[string]string{"Range": "bytes=0-3"}) + if dw.Code != http.StatusPartialContent { + t.Fatalf("range request = %d, want 206", dw.Code) + } + if !bytes.Equal(dw.Body.Bytes(), ts.bytesV1[:4]) { + t.Errorf("partial body = %q, want first 4 bytes %q", dw.Body.Bytes(), ts.bytesV1[:4]) + } + // Whole-file hash header must be present on 206 (commits to the full file). + if got := dw.Header().Get("X-Dataset-File-Hash"); got != sha256hex(ts.bytesV1) { + t.Errorf("206 X-Dataset-File-Hash = %q, want whole-file %q", got, sha256hex(ts.bytesV1)) + } +} + +func TestServer_GatesRejectNonMembersAndAnonymous(t *testing.T) { + ts := newTestServer(t, MembershipInvite, fakePayments{version: 1}) + h := ts.srv.Handler() + + if w := do(t, h, "GET", "/dataset/ds/download?version=1", "", nil); w.Code != http.StatusUnauthorized { + t.Errorf("anonymous download = %d, want 401", w.Code) + } + if w := do(t, h, "GET", "/dataset/ds/download?version=1", "obol-research-mt-bogus", nil); w.Code != http.StatusForbidden { + t.Errorf("non-member download = %d, want 403", w.Code) + } + if w := do(t, h, "GET", "/dataset/ds/status", "not-the-owner", nil); w.Code != http.StatusUnauthorized { + t.Errorf("non-owner status = %d, want 401", w.Code) + } + if w := do(t, h, "GET", "/dataset/ds/status", ownerToken, nil); w.Code != http.StatusOK { + t.Errorf("owner status = %d, want 200", w.Code) + } +} + +func TestServer_DeviceAuthAdmitGetsHeadAccess(t *testing.T) { + ts := newTestServer(t, MembershipOpen, nil) // open: auto-approved on code request + h := ts.srv.Handler() + + // device code (auto-approved) -> token + cw := do(t, h, "POST", "/auth/device/code", "", nil) + var grant struct { + DeviceCode string `json:"device_code"` + } + _ = json.Unmarshal(cw.Body.Bytes(), &grant) + + r := httptest.NewRequest("POST", "/auth/device/token", bytes.NewReader([]byte(`{"device_code":"`+grant.DeviceCode+`"}`))) + tw := httptest.NewRecorder() + h.ServeHTTP(tw, r) + var tok struct { + Token string `json:"token"` + } + _ = json.Unmarshal(tw.Body.Bytes(), &tok) + if tok.Token == "" { + t.Fatalf("no token issued: %s", tw.Body.String()) + } + + if dw := do(t, h, "GET", "/dataset/ds/download?version=1", tok.Token, nil); dw.Code != http.StatusOK { + t.Errorf("admitted worker download = %d, want 200", dw.Code) + } +} + +func TestServer_VerifyReportsChainHealth(t *testing.T) { + ts := newTestServer(t, MembershipOpen, nil) + h := ts.srv.Handler() + + w := do(t, h, "GET", "/dataset/ds/verify", ownerToken, nil) + if w.Code != http.StatusOK { + t.Fatalf("verify = %d", w.Code) + } + var res struct { + Valid bool `json:"valid"` + Head int `json:"head"` + } + _ = json.Unmarshal(w.Body.Bytes(), &res) + if !res.Valid || res.Head != 1 { + t.Errorf("verify = %+v, want valid head=1", res) + } +} + +func TestServer_RehydratesPaidMemberAfterRestart(t *testing.T) { + ts := newTestServer(t, MembershipInvite, fakePayments{version: 1, atomic: "1000"}) + h := ts.srv.Handler() + + jw := do(t, h, "POST", "/dataset/ds/join/paid", "", nil) + var join struct{ Token string } + _ = json.Unmarshal(jw.Body.Bytes(), &join) + + // Simulate a restart: load persisted state into a brand-new server. + st, err := ts.store.Load() + if err != nil { + t.Fatalf("load: %v", err) + } + if len(st.Entitlements) != 1 { + t.Fatalf("persisted entitlements = %d, want 1", len(st.Entitlements)) + } + restarted := NewServer(Config{ + ID: "ds", Membership: MembershipInvite, OwnerToken: ownerToken, + OwnerSigner: ts.signer.SignerID(), + Log: LogFromVersions(st.Versions), + Ents: loadEnts(st.Entitlements), + Store: ts.store, + Artifacts: memArtifacts{data: map[int][]byte{1: ts.bytesV1}}, + Payments: fakePayments{version: 1}, + }) + + // The pre-restart token still works — the member did not have to re-pay. + if dw := do(t, restarted.Handler(), "GET", "/dataset/ds/download?version=1", join.Token, nil); dw.Code != http.StatusOK { + t.Errorf("post-restart download = %d, want 200 (rehydration failed)", dw.Code) + } +} + +func loadEnts(ents []Entitlement) *Entitlements { + e := NewEntitlements() + e.Load(ents) + return e +} + +func errorKind(t *testing.T, body []byte) string { + t.Helper() + var e struct { + Error string `json:"error"` + } + _ = json.Unmarshal(body, &e) + return e.Error +} diff --git a/internal/dataset/signer.go b/internal/dataset/signer.go new file mode 100644 index 00000000..d30240ad --- /dev/null +++ b/internal/dataset/signer.go @@ -0,0 +1,59 @@ +package dataset + +import ( + "crypto/ecdsa" + "encoding/hex" + "fmt" + "strings" + + ethcrypto "github.com/ethereum/go-ethereum/crypto" +) + +// EthSigner signs version digests with a secp256k1 key — the same key kind +// used for the owner's on-chain (ERC-8004) registration. No new key custody: +// the caller supplies the already-loaded key. +type EthSigner struct { + priv *ecdsa.PrivateKey + addr string +} + +// NewEthSigner wraps a secp256k1 private key as a Signer. +func NewEthSigner(priv *ecdsa.PrivateKey) *EthSigner { + return &EthSigner{ + priv: priv, + addr: strings.ToLower(ethcrypto.PubkeyToAddress(priv.PublicKey).Hex()), + } +} + +// SignerID returns the lowercased 0x EVM address of the signing key. +func (s *EthSigner) SignerID() string { return s.addr } + +// SignDigest returns a 65-byte [R||S||V] secp256k1 signature, hex-encoded. +func (s *EthSigner) SignDigest(digest [32]byte) (string, error) { + sig, err := ethcrypto.Sign(digest[:], s.priv) + if err != nil { + return "", err + } + return hex.EncodeToString(sig), nil +} + +// EthVerifier recovers the signer's EVM address from a secp256k1 signature. +// It is the zero value of an empty struct — stateless and reusable. +type EthVerifier struct{} + +// RecoverSigner recovers the lowercased 0x EVM address that produced sigHex +// over digest. +func (EthVerifier) RecoverSigner(digest [32]byte, sigHex string) (string, error) { + sig, err := hex.DecodeString(sigHex) + if err != nil { + return "", fmt.Errorf("decode signature: %w", err) + } + if len(sig) != 65 { + return "", fmt.Errorf("signature length %d, want 65", len(sig)) + } + pub, err := ethcrypto.SigToPub(digest[:], sig) + if err != nil { + return "", fmt.Errorf("recover pubkey: %w", err) + } + return strings.ToLower(ethcrypto.PubkeyToAddress(*pub).Hex()), nil +} diff --git a/internal/dataset/store.go b/internal/dataset/store.go new file mode 100644 index 00000000..a1a7390f --- /dev/null +++ b/internal/dataset/store.go @@ -0,0 +1,89 @@ +package dataset + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sync" +) + +// State is the restart-surviving state of a dataset server: its signed +// version log plus the member-token entitlements. Persisting the entitlements +// (by token hash) lets a continuously-sold dataset rehydrate paying members +// after a host restart instead of forcing every subscriber to re-pay. +type State struct { + ID string `json:"id"` + GroupID string `json:"groupID"` + Versions []DatasetVersion `json:"versions"` + Entitlements []Entitlement `json:"entitlements"` + // Artifacts maps a version Seq to the operator-local file path serving + // its bytes. It is local-only metadata (not part of the signed log), so a + // `publish` can rebuild its file-backed artifact source from disk. + Artifacts map[int]string `json:"artifacts,omitempty"` +} + +// Store persists State to a JSON file using an atomic temp-file + rename, so a +// crash mid-write never leaves a torn file (matches internal/network/record.go). +type Store struct { + path string + mu sync.Mutex +} + +// NewStore returns a Store backed by path. +func NewStore(path string) *Store { return &Store{path: path} } + +// Path is the backing file path. +func (s *Store) Path() string { return s.path } + +// Load reads the persisted state. A missing file is not an error — it returns +// the zero State so a fresh dataset starts clean. +func (s *Store) Load() (State, error) { + s.mu.Lock() + defer s.mu.Unlock() + data, err := os.ReadFile(s.path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return State{}, nil + } + return State{}, fmt.Errorf("dataset: read store %s: %w", s.path, err) + } + var st State + if err := json.Unmarshal(data, &st); err != nil { + return State{}, fmt.Errorf("dataset: parse store %s: %w", s.path, err) + } + return st, nil +} + +// Save atomically writes state: marshal -> temp file in the same dir -> fsync +// via close -> rename over the target. +func (s *Store) Save(st State) error { + s.mu.Lock() + defer s.mu.Unlock() + data, err := json.MarshalIndent(st, "", " ") + if err != nil { + return fmt.Errorf("dataset: marshal store: %w", err) + } + dir := filepath.Dir(s.path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return fmt.Errorf("dataset: mkdir %s: %w", dir, err) + } + tmp, err := os.CreateTemp(dir, ".dataset-*.tmp") + if err != nil { + return fmt.Errorf("dataset: create temp: %w", err) + } + tmpName := tmp.Name() + defer os.Remove(tmpName) // no-op once the rename succeeds + if _, err := tmp.Write(data); err != nil { + tmp.Close() + return fmt.Errorf("dataset: write temp: %w", err) + } + if err := tmp.Close(); err != nil { + return fmt.Errorf("dataset: close temp: %w", err) + } + if err := os.Rename(tmpName, s.path); err != nil { + return fmt.Errorf("dataset: rename temp over %s: %w", s.path, err) + } + return nil +} diff --git a/internal/dataset/store_test.go b/internal/dataset/store_test.go new file mode 100644 index 00000000..f2e3dac0 --- /dev/null +++ b/internal/dataset/store_test.go @@ -0,0 +1,68 @@ +package dataset + +import ( + "path/filepath" + "reflect" + "testing" + "time" +) + +func TestStore_SaveLoadRoundTrip(t *testing.T) { + path := filepath.Join(t.TempDir(), "nested", "state.json") + store := NewStore(path) + + want := State{ + ID: "pi-sessions", + GroupID: "grp-123", + Versions: []DatasetVersion{ + {Seq: 1, ManifestHash: hashA, FileHash: hashA, Size: 100, Timestamp: fixedTime, Signature: "sig1"}, + {Seq: 2, ManifestHash: hashB, FileHash: hashB, Size: 200, PrevSig: "sig1", Timestamp: fixedTime, Signature: "sig2"}, + }, + Entitlements: []Entitlement{{TokenHash: "tok", GroupID: "grp-123", MaxVersion: 2, PaidAtomic: "2000"}}, + } + + if err := store.Save(want); err != nil { + t.Fatalf("Save: %v", err) // also creates the nested dir + } + got, err := store.Load() + if err != nil { + t.Fatalf("Load: %v", err) + } + if !reflect.DeepEqual(got, want) { + t.Errorf("round-trip mismatch:\n got %+v\nwant %+v", got, want) + } +} + +func TestStore_LoadMissingFileIsEmpty(t *testing.T) { + store := NewStore(filepath.Join(t.TempDir(), "does-not-exist.json")) + got, err := store.Load() + if err != nil { + t.Fatalf("Load of missing file should not error, got %v", err) + } + if !reflect.DeepEqual(got, State{}) { + t.Errorf("missing-file Load = %+v, want zero State", got) + } +} + +func TestStore_SaveOverwritesAtomically(t *testing.T) { + path := filepath.Join(t.TempDir(), "state.json") + store := NewStore(path) + + if err := store.Save(State{ID: "v1", Versions: []DatasetVersion{{Seq: 1, ManifestHash: hashA, FileHash: hashA, Timestamp: fixedTime}}}); err != nil { + t.Fatalf("first Save: %v", err) + } + if err := store.Save(State{ID: "v2", Versions: []DatasetVersion{ + {Seq: 1, ManifestHash: hashA, FileHash: hashA, Timestamp: fixedTime}, + {Seq: 2, ManifestHash: hashB, FileHash: hashB, Timestamp: fixedTime.Add(time.Hour)}, + }}); err != nil { + t.Fatalf("second Save: %v", err) + } + + got, err := store.Load() + if err != nil { + t.Fatalf("Load: %v", err) + } + if got.ID != "v2" || len(got.Versions) != 2 { + t.Errorf("overwrite failed: got ID=%q versions=%d, want v2/2", got.ID, len(got.Versions)) + } +} diff --git a/internal/dataset/versionlog.go b/internal/dataset/versionlog.go new file mode 100644 index 00000000..e212906e --- /dev/null +++ b/internal/dataset/versionlog.go @@ -0,0 +1,244 @@ +// Package dataset implements the owner-hosted, membership-gated serving of +// versioned dataset artifacts: a signed, hash-chained version log +// (content-addressed by manifestHash), a token -> version entitlement map, +// and an HTTP server that streams artifacts to paying/approved members. +// +// The version log is a money/integrity primitive: each published version is +// signed by the owner's secp256k1 key (the same key used for on-chain +// registration) over a canonical, length-prefixed, domain-separated digest, +// and chained to its predecessor's signature so a third party replaying or +// reordering a fetched log is detectable offline. +package dataset + +import ( + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "strings" + "sync" + "time" +) + +// digestDST is the 32-byte domain-separation tag mixed into every version +// digest. It prevents a signature minted here from being replayed as a +// signature in any other obol protocol. The string is zero-padded to 32 bytes. +const digestDST = "obol/dataset/versionlog/v1" + +// DatasetVersion is one immutable, signed entry in a dataset's version log. +// The human version tag is Seq (v1, v2, ...). ManifestHash is the +// content-address anchor; FileHash is the SHA-256 of the whole served +// artifact; Size binds the per-version price. PrevSig chains to the +// predecessor's Signature ("" at Seq==1). +type DatasetVersion struct { + Seq int `json:"seq"` + ManifestHash string `json:"manifestHash"` + FileHash string `json:"fileHash"` + Size int64 `json:"size"` + PrevSig string `json:"prevSig"` + Timestamp time.Time `json:"timestamp"` + Signature string `json:"signature"` +} + +// Signer signs a 32-byte digest with the owner's key. SignerID is the +// signer's lowercased 0x EVM address (recoverable from the signature). +type Signer interface { + SignerID() string + SignDigest(digest [32]byte) (sigHex string, err error) +} + +// Verifier recovers the signer's EVM address from a signature over a digest. +type Verifier interface { + RecoverSigner(digest [32]byte, sigHex string) (signerID string, err error) +} + +// CanonicalDigest computes the SHA-256 digest the owner signs for v, using a +// fixed-width, length-prefixed, domain-separated encoding (NOT fmt/concat): +// +// SHA-256( DST[32] ‖ u64be(Seq) ‖ manifest[32] ‖ file[32] ‖ u64be(Size) +// ‖ u8(len(PrevSig)) ‖ PrevSig ) +// +// ManifestHash and FileHash MUST be exactly 64 lowercase-or-mixed hex chars; +// anything else is rejected so attacker-controlled width can never shift the +// encoding. +func CanonicalDigest(v DatasetVersion) ([32]byte, error) { + var zero [32]byte + if v.Seq < 1 { + return zero, fmt.Errorf("dataset: seq must be >= 1, got %d", v.Seq) + } + if v.Size < 0 { + return zero, fmt.Errorf("dataset: size must be >= 0, got %d", v.Size) + } + mh, err := decodeSHA256Hex(v.ManifestHash) + if err != nil { + return zero, fmt.Errorf("dataset: manifestHash: %w", err) + } + fh, err := decodeSHA256Hex(v.FileHash) + if err != nil { + return zero, fmt.Errorf("dataset: fileHash: %w", err) + } + if len(v.PrevSig) > 255 { + return zero, fmt.Errorf("dataset: prevSig too long (%d > 255)", len(v.PrevSig)) + } + + var dst [32]byte + copy(dst[:], digestDST) + + h := sha256.New() + h.Write(dst[:]) + var u64 [8]byte + binary.BigEndian.PutUint64(u64[:], uint64(v.Seq)) + h.Write(u64[:]) + h.Write(mh) + h.Write(fh) + binary.BigEndian.PutUint64(u64[:], uint64(v.Size)) + h.Write(u64[:]) + h.Write([]byte{byte(len(v.PrevSig))}) + h.Write([]byte(v.PrevSig)) + + var out [32]byte + copy(out[:], h.Sum(nil)) + return out, nil +} + +// decodeSHA256Hex requires exactly 64 hex chars (a SHA-256) and returns the +// 32 raw bytes. +func decodeSHA256Hex(s string) ([]byte, error) { + if len(s) != 64 { + return nil, fmt.Errorf("want 64 hex chars, got %d", len(s)) + } + b, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + return b, nil +} + +// Log is an append-only, in-memory sequence of signed dataset versions. +// Safe for concurrent use. +type Log struct { + mu sync.Mutex + versions []DatasetVersion +} + +// NewLog returns an empty log. +func NewLog() *Log { return &Log{} } + +// LogFromVersions rebuilds a log from persisted versions (e.g. loaded from +// disk). The versions are NOT re-verified here; call Verify for that. +func LogFromVersions(versions []DatasetVersion) *Log { + cp := make([]DatasetVersion, len(versions)) + copy(cp, versions) + return &Log{versions: cp} +} + +// Len returns the number of versions. +func (l *Log) Len() int { + l.mu.Lock() + defer l.mu.Unlock() + return len(l.versions) +} + +// Versions returns a copy of every version in sequence order. +func (l *Log) Versions() []DatasetVersion { + l.mu.Lock() + defer l.mu.Unlock() + out := make([]DatasetVersion, len(l.versions)) + copy(out, l.versions) + return out +} + +// Head returns the latest version, or ok==false when the log is empty. +func (l *Log) Head() (DatasetVersion, bool) { + l.mu.Lock() + defer l.mu.Unlock() + if len(l.versions) == 0 { + return DatasetVersion{}, false + } + return l.versions[len(l.versions)-1], true +} + +// Get returns the version with the given Seq (1-based), or ok==false. +func (l *Log) Get(seq int) (DatasetVersion, bool) { + l.mu.Lock() + defer l.mu.Unlock() + if seq < 1 || seq > len(l.versions) { + return DatasetVersion{}, false + } + return l.versions[seq-1], true +} + +// Append builds the next version (Seq = len+1, chained to the prior +// signature), signs it with signer, and appends it. The hex digests are +// lowercased for stable byte-comparison downstream. +func (l *Log) Append(manifestHash, fileHash string, size int64, signer Signer, now time.Time) (DatasetVersion, error) { + l.mu.Lock() + defer l.mu.Unlock() + + prevSig := "" + if n := len(l.versions); n > 0 { + prevSig = l.versions[n-1].Signature + } + v := DatasetVersion{ + Seq: len(l.versions) + 1, + ManifestHash: strings.ToLower(strings.TrimSpace(manifestHash)), + FileHash: strings.ToLower(strings.TrimSpace(fileHash)), + Size: size, + PrevSig: prevSig, + Timestamp: now.UTC(), + } + digest, err := CanonicalDigest(v) + if err != nil { + return DatasetVersion{}, err + } + sig, err := signer.SignDigest(digest) + if err != nil { + return DatasetVersion{}, fmt.Errorf("dataset: sign version %d: %w", v.Seq, err) + } + v.Signature = sig + l.versions = append(l.versions, v) + return v, nil +} + +// ErrEmptyLog is returned by Verify when there are no versions. +var ErrEmptyLog = errors.New("dataset: version log is empty") + +// Verify walks the log v1..head and rejects it unless every entry: has a +// monotonic Seq starting at 1, chains correctly to its predecessor's +// Signature, carries a signature recoverable to expectedSigner, and has a +// valid canonical digest. This detects offline any reorder, middle removal, +// or tamper of a fetched log. (Tail truncation yields a valid shorter chain +// and is caught instead by comparing the advertised head version, not here.) +// expectedSigner == "" skips the owner-identity check (signature validity +// only). +func (l *Log) Verify(verifier Verifier, expectedSigner string) error { + l.mu.Lock() + defer l.mu.Unlock() + if len(l.versions) == 0 { + return ErrEmptyLog + } + want := strings.ToLower(strings.TrimSpace(expectedSigner)) + prevSig := "" + for i, v := range l.versions { + if v.Seq != i+1 { + return fmt.Errorf("dataset: version at index %d has non-monotonic seq %d", i, v.Seq) + } + if v.PrevSig != prevSig { + return fmt.Errorf("dataset: version %d: chain break (prevSig does not match predecessor)", v.Seq) + } + digest, err := CanonicalDigest(v) + if err != nil { + return fmt.Errorf("dataset: version %d: %w", v.Seq, err) + } + signer, err := verifier.RecoverSigner(digest, v.Signature) + if err != nil { + return fmt.Errorf("dataset: version %d: bad signature: %w", v.Seq, err) + } + if want != "" && strings.ToLower(signer) != want { + return fmt.Errorf("dataset: version %d: signed by %s, want owner %s", v.Seq, signer, want) + } + prevSig = v.Signature + } + return nil +} diff --git a/internal/dataset/versionlog_test.go b/internal/dataset/versionlog_test.go new file mode 100644 index 00000000..3a8c1559 --- /dev/null +++ b/internal/dataset/versionlog_test.go @@ -0,0 +1,208 @@ +package dataset + +import ( + "strings" + "testing" + "time" + + ethcrypto "github.com/ethereum/go-ethereum/crypto" +) + +const ( + hashA = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + hashB = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + hashC = "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" +) + +func newTestSigner(t *testing.T) *EthSigner { + t.Helper() + key, err := ethcrypto.GenerateKey() + if err != nil { + t.Fatalf("GenerateKey: %v", err) + } + return NewEthSigner(key) +} + +var fixedTime = time.Date(2026, 6, 14, 12, 0, 0, 0, time.UTC) + +func TestCanonicalDigest_RejectsMalformedHashes(t *testing.T) { + tests := []struct { + name string + v DatasetVersion + }{ + {"short manifest", DatasetVersion{Seq: 1, ManifestHash: "abcd", FileHash: hashB}}, + {"non-hex manifest", DatasetVersion{Seq: 1, ManifestHash: strings.Repeat("z", 64), FileHash: hashB}}, + {"short file", DatasetVersion{Seq: 1, ManifestHash: hashA, FileHash: "ff"}}, + {"empty file", DatasetVersion{Seq: 1, ManifestHash: hashA, FileHash: ""}}, + {"seq zero", DatasetVersion{Seq: 0, ManifestHash: hashA, FileHash: hashB}}, + {"negative size", DatasetVersion{Seq: 1, ManifestHash: hashA, FileHash: hashB, Size: -1}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if _, err := CanonicalDigest(tt.v); err == nil { + t.Errorf("CanonicalDigest(%+v) = nil err, want rejection", tt.v) + } + }) + } +} + +func TestCanonicalDigest_DeterministicAndFieldSensitive(t *testing.T) { + base := DatasetVersion{Seq: 2, ManifestHash: hashA, FileHash: hashB, Size: 1048576, PrevSig: "deadbeef"} + d1, err := CanonicalDigest(base) + if err != nil { + t.Fatalf("digest: %v", err) + } + d2, _ := CanonicalDigest(base) + if d1 != d2 { + t.Fatal("digest not deterministic for identical input") + } + + // Each field change must move the digest (no canonicalization collision). + mutated := []DatasetVersion{ + {Seq: 3, ManifestHash: hashA, FileHash: hashB, Size: 1048576, PrevSig: "deadbeef"}, + {Seq: 2, ManifestHash: hashC, FileHash: hashB, Size: 1048576, PrevSig: "deadbeef"}, + {Seq: 2, ManifestHash: hashA, FileHash: hashC, Size: 1048576, PrevSig: "deadbeef"}, + {Seq: 2, ManifestHash: hashA, FileHash: hashB, Size: 1048577, PrevSig: "deadbeef"}, + {Seq: 2, ManifestHash: hashA, FileHash: hashB, Size: 1048576, PrevSig: "cafebabe"}, + } + for i, m := range mutated { + d, err := CanonicalDigest(m) + if err != nil { + t.Fatalf("mutated[%d] digest: %v", i, err) + } + if d == d1 { + t.Errorf("mutated[%d] produced the same digest as base — field not bound", i) + } + } +} + +// TestCanonicalDigest_NoConcatAmbiguity proves the length-prefix defeats the +// classic concat footgun: moving a char across the Size/PrevSig boundary must +// change the digest. (Two versions whose naive ":"-join would collide.) +func TestCanonicalDigest_NoConcatAmbiguity(t *testing.T) { + a := DatasetVersion{Seq: 1, ManifestHash: hashA, FileHash: hashB, Size: 12, PrevSig: "3abc"} + b := DatasetVersion{Seq: 1, ManifestHash: hashA, FileHash: hashB, Size: 123, PrevSig: "abc"} + da, _ := CanonicalDigest(a) + db, _ := CanonicalDigest(b) + if da == db { + t.Fatal("length-prefix failed: ambiguous Size/PrevSig boundary collided") + } +} + +func TestLog_AppendChainsAndVerifies(t *testing.T) { + signer := newTestSigner(t) + log := NewLog() + + v1, err := log.Append(hashA, hashA, 100, signer, fixedTime) + if err != nil { + t.Fatalf("append v1: %v", err) + } + if v1.Seq != 1 || v1.PrevSig != "" { + t.Errorf("v1 seq=%d prevSig=%q, want 1 and empty", v1.Seq, v1.PrevSig) + } + v2, err := log.Append(hashB, hashB, 200, signer, fixedTime) + if err != nil { + t.Fatalf("append v2: %v", err) + } + if v2.Seq != 2 || v2.PrevSig != v1.Signature { + t.Errorf("v2 not chained to v1: prevSig=%q want %q", v2.PrevSig, v1.Signature) + } + if _, err := log.Append(hashC, hashC, 300, signer, fixedTime); err != nil { + t.Fatalf("append v3: %v", err) + } + + if err := log.Verify(EthVerifier{}, signer.SignerID()); err != nil { + t.Errorf("Verify on a clean chain failed: %v", err) + } + if head, ok := log.Head(); !ok || head.Seq != 3 { + t.Errorf("Head seq = %v (ok=%v), want 3", head.Seq, ok) + } +} + +func TestLog_Verify_DetectsTamper(t *testing.T) { + signer := newTestSigner(t) + log := NewLog() + _, _ = log.Append(hashA, hashA, 100, signer, fixedTime) + _, _ = log.Append(hashB, hashB, 200, signer, fixedTime) + + // Mutate a field on the persisted copy and rebuild a log from it. + versions := log.Versions() + versions[1].Size = 999999 // attacker inflates the size after signing + tampered := LogFromVersions(versions) + if err := tampered.Verify(EthVerifier{}, signer.SignerID()); err == nil { + t.Error("Verify accepted a tampered Size — signature not binding") + } +} + +func TestLog_Verify_DetectsReorderAndMiddleRemoval(t *testing.T) { + signer := newTestSigner(t) + log := NewLog() + _, _ = log.Append(hashA, hashA, 100, signer, fixedTime) + _, _ = log.Append(hashB, hashB, 200, signer, fixedTime) + _, _ = log.Append(hashC, hashC, 300, signer, fixedTime) + versions := log.Versions() + + t.Run("reorder", func(t *testing.T) { + swapped := []DatasetVersion{versions[0], versions[2], versions[1]} + if err := LogFromVersions(swapped).Verify(EthVerifier{}, signer.SignerID()); err == nil { + t.Error("Verify accepted a reordered log") + } + }) + t.Run("middle removal", func(t *testing.T) { + gapped := []DatasetVersion{versions[0], versions[2]} + if err := LogFromVersions(gapped).Verify(EthVerifier{}, signer.SignerID()); err == nil { + t.Error("Verify accepted a log with a removed middle entry") + } + }) +} + +func TestLog_Verify_RejectsWrongSigner(t *testing.T) { + owner := newTestSigner(t) + attacker := newTestSigner(t) + log := NewLog() + _, _ = log.Append(hashA, hashA, 100, attacker, fixedTime) // signed by the wrong key + + if err := log.Verify(EthVerifier{}, owner.SignerID()); err == nil { + t.Error("Verify accepted a version signed by a non-owner key") + } + // ...but accepts it when we don't pin the owner (signature-validity only). + if err := log.Verify(EthVerifier{}, ""); err != nil { + t.Errorf("Verify with no owner pin rejected a structurally valid sig: %v", err) + } +} + +func TestLog_Verify_EmptyLog(t *testing.T) { + if err := NewLog().Verify(EthVerifier{}, ""); err != ErrEmptyLog { + t.Errorf("empty log Verify = %v, want ErrEmptyLog", err) + } +} + +// FuzzCanonicalDigest asserts the encoder never panics on arbitrary hash +// inputs and is total: valid 64-hex pairs always digest, everything else is a +// clean error. +func FuzzCanonicalDigest(f *testing.F) { + f.Add(hashA, hashB, int64(100), "") + f.Add("abcd", "", int64(0), "ff") + f.Add(strings.Repeat("0", 64), strings.Repeat("f", 64), int64(1<<40), strings.Repeat("9", 130)) + f.Fuzz(func(t *testing.T, manifest, file string, size int64, prevSig string) { + v := DatasetVersion{Seq: 1, ManifestHash: manifest, FileHash: file, Size: size, PrevSig: prevSig} + _, err := CanonicalDigest(v) + valid := len(manifest) == 64 && len(file) == 64 && + isHex(manifest) && isHex(file) && size >= 0 && len(prevSig) <= 255 + if valid && err != nil { + t.Errorf("valid input rejected: %v", err) + } + if !valid && err == nil { + t.Errorf("invalid input accepted: manifest=%q file=%q size=%d prevSigLen=%d", manifest, file, size, len(prevSig)) + } + }) +} + +func isHex(s string) bool { + for _, c := range s { + if !((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F')) { + return false + } + } + return true +} diff --git a/internal/embed/skills/dataset-anonymize/SKILL.md b/internal/embed/skills/dataset-anonymize/SKILL.md new file mode 100644 index 00000000..d811cdc7 --- /dev/null +++ b/internal/embed/skills/dataset-anonymize/SKILL.md @@ -0,0 +1,63 @@ +--- +name: dataset-anonymize +description: Anonymize a dataset's JSONL (PII detection + masking) before publishing or selling it with `obol dataset`. Pluggable detector — built-in regex redactor by default, or a BYO Hugging Face token-classification model. +--- + +# dataset-anonymize + +Strip personally-identifying information from a training dataset **before** it +is published or sold. Runs as the last privacy stage over the export bundle's +`*.jsonl` artifact, replacing detected PII spans with typed placeholders +(``, ``, …) so the bytes that leave the host carry no raw +secrets. + +This is a **pluggable** stage: + +- **Default (no setup):** a built-in, dependency-free regex redactor covers the + common high-signal categories (emails, IPs, credit-card / IBAN-shaped + numbers, US-SSN-shaped numbers, bearer/API-key-shaped tokens, private keys, + absolute home paths, phone numbers). +- **ML-grade (opt-in):** set `OBOL_ANONYMIZER_MODEL` to any Hugging Face + token-classification PII model and the script runs it via + `transformers.pipeline("token-classification", …)`, unioning its spans with + the regex pass. The model cache lands under the obol data dir (see below) so + it survives across runs and is never re-downloaded per invocation. + +The detector is replaceable by design: a stricter custom detector is "implement +a `detect(text) -> spans` and register it," not "edit a config row" — author a +sibling script and point `--detector` at it. + +## Usage + +```bash +# Default regex redactor: +python3 scripts/anonymize.py input.jsonl anonymized.jsonl --report + +# ML-grade detection with a BYO model: +export OBOL_ANONYMIZER_MODEL="/" +python3 scripts/anonymize.py input.jsonl anonymized.jsonl --report + +# Then ingest the anonymized bundle and publish it: +obol dataset from --name my-dataset +obol dataset publish my-dataset +``` + +Each input line is a JSON object; the script masks string values under +`messages[].content`, `text`, `input`, `output`, and `completion` (the common +chat/instruction fields) and leaves structure untouched. Anonymization is +deterministic within a run: the same raw value maps to the same placeholder +index, so cross-message references stay linkable without revealing the value. + +## Model cache convention + +The script exports `HF_HOME="$OBOL_DATA_DIR/cache/huggingface"` (falling back to +`$XDG_CACHE_HOME/obol/huggingface`, then `~/.cache/obol/huggingface`) before +loading any model, so downloads land under the standard obol data dir. + +## Honest limits + +Recall is bounded by the detector. The regex pass catches structured PII, not +free-text names/addresses — for those, supply a model via +`OBOL_ANONYMIZER_MODEL`. `--report` prints per-category masked counts so an +operator can sanity-check coverage before selling. Validate against your own +data; the contract is preserved so a stricter detector can replace the default. diff --git a/internal/embed/skills/dataset-anonymize/scripts/anonymize.py b/internal/embed/skills/dataset-anonymize/scripts/anonymize.py new file mode 100644 index 00000000..00a3b873 --- /dev/null +++ b/internal/embed/skills/dataset-anonymize/scripts/anonymize.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +"""Anonymize a dataset JSONL before publishing/selling it. + +Replaces PII spans in the common chat/instruction text fields with typed, +deterministically-indexed placeholders (, , ...). + +Default: a dependency-free regex redactor. Opt-in ML detection: set +OBOL_ANONYMIZER_MODEL to a Hugging Face token-classification PII model. + +Usage: + anonymize.py [--model ID] [--report] +""" +import argparse +import json +import os +import re +import sys + +# Target string fields walked in each record (chat + instruction shapes). +TEXT_FIELDS = ("content", "text", "input", "output", "completion", "prompt", "response") + +# High-signal structured PII. Order matters: earlier wins on overlap. +REGEX = [ + ("PRIVATE_KEY", re.compile(r"-----BEGIN[ A-Z]*PRIVATE KEY-----.*?-----END[ A-Z]*PRIVATE KEY-----", re.S)), + ("ETH_KEY", re.compile(r"\b0x[0-9a-fA-F]{64}\b")), + ("EMAIL", re.compile(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b")), + ("AWS_KEY", re.compile(r"\bAKIA[0-9A-Z]{16}\b")), + ("GH_TOKEN", re.compile(r"\bgh[pousr]_[A-Za-z0-9]{20,}\b")), + ("OPENAI_KEY", re.compile(r"\bsk-[A-Za-z0-9_\-]{16,}\b")), + ("BEARER", re.compile(r"(?i)\bbearer\s+[A-Za-z0-9._\-]{16,}\b")), + ("SSN", re.compile(r"\b\d{3}-\d{2}-\d{4}\b")), + ("CREDIT_CARD", re.compile(r"\b(?:\d[ -]?){13,19}\b")), + ("IPV4", re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b")), + ("PHONE", re.compile(r"\b\+?1?[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b")), + ("HOME_PATH", re.compile(r"(?:/Users/|/home/)[^/\s\"']+")), +] + + +def detect_regex(text): + spans = [] + for label, pat in REGEX: + for m in pat.finditer(text): + spans.append((m.start(), m.end(), label)) + return spans + + +def detect_model(text, pipe): + spans = [] + try: + for ent in pipe(text): + label = str(ent.get("entity_group") or ent.get("entity") or "PII").upper() + spans.append((int(ent["start"]), int(ent["end"]), label)) + except Exception as e: # model failure must never crash the run + print(f" warning: model detection failed on a span: {e}", file=sys.stderr) + return spans + + +def resolve_spans(spans): + """Drop overlaps, keeping the earliest-listed (highest-priority) span.""" + spans = sorted(spans, key=lambda s: (s[0], -(s[1] - s[0]))) + out, last_end = [], -1 + for start, end, label in spans: + if start >= last_end: + out.append((start, end, label)) + last_end = end + return out + + +def mask(text, spans, registry, counts): + """Replace spans with deterministic placeholders.""" + for start, end, label in sorted(resolve_spans(spans), key=lambda s: s[0], reverse=True): + raw = text[start:end] + per = registry.setdefault(label, {}) + if raw not in per: + per[raw] = len(per) + 1 + counts[label] = counts.get(label, 0) + 1 + text = text[:start] + f"<{label}_{per[raw]}>" + text[end:] + return text + + +def anonymize_value(value, pipe, registry, counts): + if not isinstance(value, str) or not value: + return value + spans = detect_regex(value) + if pipe is not None: + spans += detect_model(value, pipe) + return mask(value, spans, registry, counts) + + +def walk(obj, pipe, registry, counts): + if isinstance(obj, dict): + return {k: (anonymize_value(v, pipe, registry, counts) if k in TEXT_FIELDS else walk(v, pipe, registry, counts)) for k, v in obj.items()} + if isinstance(obj, list): + return [walk(v, pipe, registry, counts) for v in obj] + return obj + + +def set_hf_home(): + if os.environ.get("HF_HOME"): + return + base = os.environ.get("OBOL_DATA_DIR") + if base: + home = os.path.join(base, "cache", "huggingface") + else: + xdg = os.environ.get("XDG_CACHE_HOME") or os.path.expanduser("~/.cache") + home = os.path.join(xdg, "obol", "huggingface") + os.makedirs(home, exist_ok=True) + os.environ["HF_HOME"] = home + + +def load_pipeline(model_id): + if not model_id: + return None + set_hf_home() + try: + from transformers import pipeline # type: ignore + except Exception: + print(" warning: transformers not installed — falling back to regex-only", file=sys.stderr) + return None + print(f" loading PII model {model_id} (cache: {os.environ['HF_HOME']}) …", file=sys.stderr) + return pipeline("token-classification", model=model_id, aggregation_strategy="simple") + + +def main(): + ap = argparse.ArgumentParser(description="Anonymize a dataset JSONL") + ap.add_argument("input") + ap.add_argument("output") + ap.add_argument("--model", default=os.environ.get("OBOL_ANONYMIZER_MODEL", "")) + ap.add_argument("--report", action="store_true") + args = ap.parse_args() + + pipe = load_pipeline(args.model) + registry, counts, n = {}, {}, 0 + with open(args.input) as fin, open(args.output, "w") as fout: + for line in fin: + line = line.strip() + if not line: + continue + rec = json.loads(line) + fout.write(json.dumps(walk(rec, pipe, registry, counts), ensure_ascii=False) + "\n") + n += 1 + + masked = sum(counts.values()) + print(f"anonymized {n} record(s); masked {masked} PII span(s)" + (" via regex" if pipe is None else " via model+regex")) + if args.report: + for label in sorted(counts): + print(f" {label:14s} {counts[label]}") + + +if __name__ == "__main__": + main() diff --git a/internal/embed/skills/finetune-backend/SKILL.md b/internal/embed/skills/finetune-backend/SKILL.md new file mode 100644 index 00000000..56460e42 --- /dev/null +++ b/internal/embed/skills/finetune-backend/SKILL.md @@ -0,0 +1,57 @@ +--- +name: finetune-backend +description: Fine-tune a local model on a purchased/owned dataset through one pluggable backend contract (mock, mlx-lora, unsloth, axolotl, torchtune). Emits adapter + eval + a run.manifest binding the output to the dataset's content-address. +--- + +# finetune-backend + +Run a LoRA/SFT fine-tune over a dataset's `sft.jsonl` artifact through a single +thin contract, selectable per machine: + +``` +run(dataset_path, base_model, hyperparams) -> { adapter, eval_metric, run.manifest } +``` + +Every backend reads the **same** JSONL artifact (the bytes you downloaded with +`obol buy dataset`), so swapping backends never reshapes your data. The runner +binds each result to the exact dataset it trained on by writing the dataset's +content-address (`manifestHash`) into `run.manifest` — the provenance link from +a served/sold model back to the data that produced it. + +## Backends + +| `--backend` | Tool | Hardware | Notes | +|---|---|---|---| +| `mock` *(default)* | none | any | validates the contract + provenance with no framework; emits a deterministic stub adapter + eval. Use in CI/smoke. | +| `mlx-lora` | MLX-LM | Apple silicon | near-native chat-JSONL; native LoRA | +| `unsloth` | Unsloth | NVIDIA | fast QLoRA; on GB10 (sm_121) run eager (FA3 has no kernel) | +| `axolotl` | axolotl | multi-GPU | YAML-config; exposes grad-accum | +| `torchtune` | torchtune | NVIDIA | modular recipes; guard `torch.compile` | + +The only thing shared across real backends is "regex-extract the eval metric +from the backend CLI's stdout" — each backend's command + file layout is +otherwise its own. Add a backend by registering one `(build_cmd, metric_regex)` +pair in `BACKENDS`. + +## Usage + +```bash +# Contract/provenance smoke (no GPU, no framework): +python3 scripts/runner.py --backend mock \ + --dataset my-dataset-v1.jsonl --base-model qwen2.5-0.5b \ + --manifest-hash --out ./run + +# Real run on a GPU box: +python3 scripts/runner.py --backend unsloth \ + --dataset my-dataset-v1.jsonl --base-model unsloth/Qwen2.5-0.5B \ + --manifest-hash --lora-rank 16 --epochs 1 --out ./run + +cat ./run/run.manifest # dataset_hash == the version you bought +cat ./run/eval.json # {eval_loss, backend, base_model} +``` + +`run.manifest` is the exact deliverable shape the bounty `finetune@v1` task +declares (`adapter.safetensors` + `eval.json` + `run.manifest` with +`dataset_hash`), so a standalone run and a verified/bounty run stay consistent. +A `--dry-run` validates the dataset and emits the manifest without invoking the +backend. diff --git a/internal/embed/skills/finetune-backend/scripts/runner.py b/internal/embed/skills/finetune-backend/scripts/runner.py new file mode 100644 index 00000000..514253ae --- /dev/null +++ b/internal/embed/skills/finetune-backend/scripts/runner.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +"""Pluggable fine-tune backend runner. + +One contract over several backends: read a dataset's sft.jsonl, run a LoRA/SFT +fine-tune, and emit adapter + eval.json + run.manifest. run.manifest binds the +result to the dataset's content-address (manifestHash) — the provenance link +from a model back to the data it was trained on. + +The only thing shared across real backends is "regex-extract the eval metric +from stdout"; each backend otherwise has its own command + layout. + +Usage: + runner.py --backend --dataset --base-model + [--manifest-hash H] [--lora-rank N] [--epochs E] [--lr LR] + [--out DIR] [--dry-run] +""" +import argparse +import hashlib +import json +import os +import re +import subprocess +import sys +import time + + +def file_sha256(path): + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1 << 20), b""): + h.update(chunk) + return h.hexdigest() + + +def count_records(path): + n = 0 + with open(path) as f: + for line in f: + if line.strip(): + json.loads(line) # validate JSONL + n += 1 + return n + + +# --- backend command builders: (argv, eval_loss_regex) --- + +def build_mlx(ds, base, hp, out): + argv = ["mlx_lm.lora", "--model", base, "--train", "--data", os.path.dirname(ds) or ".", + "--iters", str(hp["epochs"] * 100), "--adapter-path", out] + return argv, re.compile(r"[Vv]al loss[:\s]+([0-9.]+)") + + +def build_unsloth(ds, base, hp, out): + argv = [sys.executable, "-m", "unsloth.cli", "--model", base, "--dataset", ds, + "--lora-rank", str(hp["lora_rank"]), "--epochs", str(hp["epochs"]), + "--lr", str(hp["lr"]), "--output", out] + return argv, re.compile(r"eval_loss['\"]?\s*[:=]\s*([0-9.]+)") + + +def build_axolotl(ds, base, hp, out): + argv = ["accelerate", "launch", "-m", "axolotl.cli.train", os.path.join(out, "axolotl.yaml")] + return argv, re.compile(r"eval_loss['\"]?\s*[:=]\s*([0-9.]+)") + + +def build_torchtune(ds, base, hp, out): + argv = ["tune", "run", "lora_finetune_single_device", "--config", os.path.join(out, "tune.yaml")] + return argv, re.compile(r"eval_loss['\"]?\s*[:=]\s*([0-9.]+)") + + +BACKENDS = { + "mlx-lora": build_mlx, + "unsloth": build_unsloth, + "axolotl": build_axolotl, + "torchtune": build_torchtune, +} + + +def run_real(backend, ds, base, hp, out): + build = BACKENDS[backend] + argv, metric_re = build(ds, base, hp, out) + print(f" $ {' '.join(argv)}", file=sys.stderr) + proc = subprocess.run(argv, capture_output=True, text=True) + sys.stderr.write(proc.stderr[-2000:]) + combined = proc.stdout + "\n" + proc.stderr + m = None + for mm in metric_re.finditer(combined): + m = mm # last match wins (final eval) + if proc.returncode != 0: + raise SystemExit(f"backend {backend} exited {proc.returncode}") + if not m: + raise SystemExit(f"backend {backend} produced no eval metric on stdout") + return float(m.group(1)) + + +def run_mock(ds, base, hp, out, records): + # Deterministic, framework-free: synthesize a plausible eval_loss from the + # data + hyperparams so the contract + provenance can be validated anywhere. + seed = int(file_sha256(ds)[:8], 16) + eval_loss = round(1.5 + (seed % 1000) / 2000.0 - 0.02 * hp["epochs"], 6) + with open(os.path.join(out, "adapter.safetensors"), "wb") as f: + f.write(b"OBOL-MOCK-ADAPTER\x00" + ds.encode() + b"\x00" + base.encode()) + print(f" mock backend: {records} records, base {base} -> eval_loss {eval_loss}", file=sys.stderr) + return eval_loss + + +def main(): + ap = argparse.ArgumentParser(description="Pluggable fine-tune backend runner") + ap.add_argument("--backend", default="mock", choices=["mock"] + list(BACKENDS)) + ap.add_argument("--dataset", required=True) + ap.add_argument("--base-model", required=True) + ap.add_argument("--manifest-hash", default="", help="dataset content-address (manifestHash) to bind into run.manifest") + ap.add_argument("--lora-rank", type=int, default=16) + ap.add_argument("--epochs", type=int, default=1) + ap.add_argument("--lr", type=float, default=2e-4) + ap.add_argument("--out", default="./run") + ap.add_argument("--dry-run", action="store_true") + args = ap.parse_args() + + os.makedirs(args.out, exist_ok=True) + records = count_records(args.dataset) + if records == 0: + raise SystemExit("dataset is empty") + dataset_file_hash = file_sha256(args.dataset) + hp = {"lora_rank": args.lora_rank, "epochs": args.epochs, "lr": args.lr} + + eval_loss = None + if args.dry_run: + print(f" dry-run: {records} valid records, would run {args.backend}", file=sys.stderr) + elif args.backend == "mock": + eval_loss = run_mock(args.dataset, args.base_model, hp, args.out, records) + else: + eval_loss = run_real(args.backend, args.dataset, args.base_model, hp, args.out) + + eval_json = {"eval_loss": eval_loss, "backend": args.backend, "base_model": args.base_model, "records": records} + with open(os.path.join(args.out, "eval.json"), "w") as f: + json.dump(eval_json, f, indent=2) + + manifest = { + # The provenance link: this fine-tune is bound to exactly the dataset + # version it trained on. + "dataset_hash": args.manifest_hash or dataset_file_hash, + "dataset_file_hash": dataset_file_hash, + "base_model": args.base_model, + "backend": args.backend, + "hyperparams": hp, + "eval_loss": eval_loss, + "adapter": "adapter.safetensors", + "records": records, + "created_at": int(time.time()), + } + with open(os.path.join(args.out, "run.manifest"), "w") as f: + json.dump(manifest, f, indent=2) + + print(json.dumps({"out": args.out, "backend": args.backend, "eval_loss": eval_loss, + "dataset_hash": manifest["dataset_hash"]})) + + +if __name__ == "__main__": + main() diff --git a/internal/research/groupauth/groupauth.go b/internal/research/groupauth/groupauth.go index ebc7863e..146b1cc2 100644 --- a/internal/research/groupauth/groupauth.go +++ b/internal/research/groupauth/groupauth.go @@ -241,6 +241,56 @@ func (a *Authority) Revoke(rawToken string) { } } +// HashToken returns the SHA-256 hex hash of a raw member token. Exposed so a +// caller (e.g. a payment-gated service's entitlement map) can key off the +// same hash the Authority stores without ever holding the raw token. +func HashToken(rawToken string) string { return hashToken(rawToken) } + +// Mint issues a member token for groupID WITHOUT the device-auth flow, for +// services where a settled payment — not an owner approval — is the +// membership decision. The raw token is returned exactly once; only its hash +// is retained (returned too, so the caller can persist it). label is a +// non-trusted descriptive tag. +func (a *Authority) Mint(groupID, label string) (rawToken, tokenHash string, err error) { + raw, err := randomHex(32) + if err != nil { + return "", "", err + } + rawToken = tokenPrefix + raw + tokenHash = hashToken(rawToken) + + a.mu.Lock() + defer a.mu.Unlock() + a.tokens[tokenHash] = &memberToken{ + TokenHash: tokenHash, + GroupID: strings.TrimSpace(groupID), + Label: strings.TrimSpace(label), + IssuedAt: a.now(), + Active: true, + } + return rawToken, tokenHash, nil +} + +// RegisterHash re-registers a previously issued token by its hash, marking it +// active for groupID. It lets a persistent store rehydrate the in-memory +// Authority after a restart without ever seeing the raw token (the Authority +// verifies by hash). A blank hash is ignored; otherwise idempotent. +func (a *Authority) RegisterHash(tokenHash, groupID, label string) { + tokenHash = strings.TrimSpace(tokenHash) + if tokenHash == "" { + return + } + a.mu.Lock() + defer a.mu.Unlock() + a.tokens[tokenHash] = &memberToken{ + TokenHash: tokenHash, + GroupID: strings.TrimSpace(groupID), + Label: strings.TrimSpace(label), + IssuedAt: a.now(), + Active: true, + } +} + // --- helpers --- func generateUserCode() (string, error) { diff --git a/plans/dataset-subscription-v1.1-pitch.md b/plans/dataset-subscription-v1.1-pitch.md new file mode 100644 index 00000000..ded2be59 --- /dev/null +++ b/plans/dataset-subscription-v1.1-pitch.md @@ -0,0 +1,110 @@ +# V1.1 — Continuous Dataset Subscriptions (escrow), a diagram pitch + +**Status:** pitch only — held until the batch-settlement payout leg lands. +**v1 (shipped P1–P6):** a dataset is sold **per version** — one payment buys +exactly the version it was scoped to. That is the right primitive for a +snapshot. It is the *wrong* primitive for a **living** dataset that ships a new +version every week and wants subscribers, not one-off buyers. + +--- + +## 1. The gap v1 leaves + +``` + v1 today: buyer ── pays atomic(v3) ──▶ token{maxVersion=3} ──▶ download v3 + new v4 ships ──▶ buyer must re-probe, re-pay atomic(v4) ❌ friction +``` + +A continuously-sold dataset wants: **pay once, keep receiving** — but without +the buyer pre-funding a platform (no custody) and without the buyer paying for +versions that never ship (no "pay upfront and hope"). + +That is exactly the shape escrow was built for: **authorize ≠ capture.** + +--- + +## 2. The mechanism: reserve → capture-per-epoch → void + +``` + SUBSCRIBE (once) + buyer ── signs ONE Permit2 voucher ─────────────────────────────▶ x402-escrow + { recipient: seller, (non-custodial: + max: price_per_epoch × N, holds the SIGNATURE, + deadlines: [d1, d2, … dN] } not the money) + │ + ┌───────────────────────── per new version (epoch) ─────────────────────┤ + │ seller ships v_k ─▶ controller appends signed version k │ + │ escrow CAPTURE(epoch k): transfer price_per_epoch buyer ─▶ seller │ (one recipient, + │ entitlement top-up: token.maxVersion = k │ one settlement, + │ buyer's existing token now downloads v_k — no re-pay │ per epoch) + └───────────────────────────────────────────────────────────────────────┤ + │ + UNSUBSCRIBE / seller stops shipping │ + buyer (or timeout) ── VOID remaining deadlines ─────────────────────────▶ no further capture + → buyer paid for delivered versions ONLY (funds never moved + for undelivered epochs) +``` + +**The invariant that makes it fair:** money moves **only** when a new version +is actually appended to the signed log. No version, no capture. The buyer's +worst case is bounded to `price_per_epoch × (versions actually shipped)`, and +the seller cannot capture ahead of delivery. + +--- + +## 3. What it reuses vs. what is genuinely new + +``` + REUSED (already shipped): + ├─ version log (P2) ........... defines an "epoch" = a new signed Seq + ├─ entitlement map (P2) ....... capture's side effect = token.maxVersion += 1 + ├─ x402-escrow facilitator .... reserve / capture / void, Permit2, non-custodial + └─ ForwardAuth + catalog ...... the priced offer, unchanged + + NEW (the v1.1 cost, stated honestly): + └─ a per-epoch, single-recipient CAPTURE LOOP keyed by subscription id + (Reserve a multi-deadline voucher; Capture once per shipped version to + the one seller; Void the tail on unsubscribe) +``` + +This is **not** `CaptureBatch` reuse. `CaptureBatch` splits *one* held auth +across *k recipients* in *one* settlement (e.g. an evaluator panel). A +subscription is the orthogonal shape: *one recipient*, *k settlements over +time*. v1.1 therefore needs new escrow wiring — a small loop, but real code, not +"zero-cost reuse." That honesty is why it is held, not hand-waved into v1. + +--- + +## 4. Why this is the right shape (and the moat) + +``` + prepaid credits (HF/cloud): pay upfront ─▶ platform custody ─▶ hope for value + v1.1 escrow subscription: authorize ─▶ NO custody ─▶ capture on delivery +``` + +- **No platform float, no custody honeypot** — the facilitator holds a + signature, never the money; capture moves funds owner→seller directly. +- **Pay for delivered value** — the seller is paid per shipped version; the + buyer is refunded-by-default (void) for versions never shipped. +- **Same wallet, same identity, same gate** — a subscription is just a + longer-lived entitlement; the `entitle()` download gate is unchanged. + +It is the dataset analogue of the metered-inference escrow pitch: *price the +outcome (a delivered version), not the prepayment.* + +--- + +## 5. Dependency & ask + +``` + blocked on ──▶ the batch-settlement PAYOUT leg (open; tracked in the + OpenRouter-direction work) — capture needs the same + settle-to-recipient path that leg finalizes. + + ask ───────▶ green-light v1.1 as the FIRST consumer of the payout leg once + it lands: it is a small, well-scoped capture loop on top of the + version log + entitlement map already shipped in P2. +``` + +One artifact, two uses, **and now a recurring revenue shape** — without anyone +fronting a deposit or taking custody.