diff --git a/cmd/obol/main.go b/cmd/obol/main.go index d3432da5..a6751acb 100644 --- a/cmd/obol/main.go +++ b/cmd/obol/main.go @@ -325,6 +325,7 @@ GLOBAL OPTIONS:{{template "visibleFlagTemplate" .}}{{end}} openclawCommand(cfg), sellCommand(cfg), buyCommand(cfg), + researchCommand(cfg), modelCommand(cfg), { Name: "app", diff --git a/cmd/obol/research.go b/cmd/obol/research.go new file mode 100644 index 00000000..d9c51585 --- /dev/null +++ b/cmd/obol/research.go @@ -0,0 +1,397 @@ +package main + +// obol research — owner side of a decentralized auto-research program. +// +// obol research publish start the KB + membership server on this +// machine, expose it over a Cloudflare quick +// tunnel, print the public URL workers join. +// obol research approve admit a worker (membership decision). +// obol research status [] roster, results, champion, payouts. +// +// The server is the host gateway (same spirit as `obol sell inference`): it +// runs on the owner's machine, not in the cluster, and reaches remote +// workers over the real internet via Cloudflare — no tailscale, every KB +// route gated by a groupauth member token. + +import ( + "bufio" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "regexp" + "strings" + "syscall" + "time" + + "github.com/ObolNetwork/obol-stack/internal/config" + "github.com/ObolNetwork/obol-stack/internal/research/kb" + "github.com/ObolNetwork/obol-stack/internal/research/server" + "github.com/urfave/cli/v3" +) + +// researchState is persisted so `approve`/`status` can reach a running +// `publish` server. +type researchState struct { + Program string `json:"program"` + LocalAddr string `json:"local_addr"` // http://127.0.0.1:PORT + PublicURL string `json:"public_url"` // https://xxx.trycloudflare.com + OwnerToken string `json:"owner_token"` // gates approve/status +} + +func researchCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "research", + Usage: "Run a decentralized auto-research program (publish an ID, admit workers, collect results)", + Commands: []*cli.Command{ + researchPublishCommand(cfg), + researchApproveCommand(cfg), + researchStatusCommand(cfg), + }, + } +} + +func researchPublishCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "publish", + Usage: "Publish a research program: host the KB + membership server and expose it over a Cloudflare tunnel", + ArgsUsage: "", + Description: `Starts the program's knowledge-base + membership server on this machine +and opens a Cloudflare quick tunnel so workers on other obol-stacks can +join over the open internet. Runs in the foreground — Ctrl-C stops the +program. Approve joining workers from another shell with +'obol research approve '.`, + Flags: []cli.Flag{ + &cli.StringFlag{Name: "objective", Usage: "Free-text hypothesis space"}, + &cli.StringFlag{Name: "metric", Usage: "Metric name (e.g. val_bpb)", Required: true}, + &cli.StringFlag{Name: "direction", Usage: "minimize | maximize", Value: "minimize"}, + &cli.StringFlag{Name: "accept", Usage: "beats-champion | threshold", Value: "beats-champion"}, + &cli.FloatFlag{Name: "threshold", Usage: "Acceptance threshold (when --accept=threshold)"}, + &cli.FloatFlag{Name: "baseline", Usage: "Reference metric value; first improvement's impact is measured against it"}, + &cli.FloatFlag{Name: "pool", Usage: "Reward pool", Value: 0}, + &cli.StringFlag{Name: "token", Usage: "Reward token", Value: "OBOL"}, + &cli.StringFlag{Name: "network", Usage: "Payment chain", Value: "base-sepolia"}, + &cli.StringFlag{Name: "membership", Usage: "open | invite", Value: "invite"}, + &cli.StringFlag{Name: "split", Usage: "by-impact | champion-takes-all", Value: "by-impact"}, + &cli.IntFlag{Name: "port", Usage: "Local port (0 = pick a free one)", Value: 0}, + &cli.BoolFlag{Name: "no-tunnel", Usage: "Serve locally only (skip the Cloudflare tunnel)"}, + }, + Action: func(ctx context.Context, cmd *cli.Command) error { + u := getUI(cmd) + if cmd.NArg() != 1 { + return fmt.Errorf("program name required: obol research publish ") + } + name := strings.TrimSpace(cmd.Args().First()) + + prog, err := programFromFlags(name, cmd) + if err != nil { + return err + } + + ownerToken, err := randomToken("obol-research-owner-") + if err != nil { + return err + } + srv := server.New(prog, cmd.String("membership"), ownerToken, nil) + + ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.Int("port"))) + if err != nil { + return fmt.Errorf("listen: %w", err) + } + localAddr := "http://" + ln.Addr().String() + httpSrv := &http.Server{Handler: srv.Handler()} + go func() { _ = httpSrv.Serve(ln) }() + + publicURL := localAddr + var tunnel *exec.Cmd + if !cmd.Bool("no-tunnel") { + u.Info("Opening Cloudflare tunnel …") + turl, tcmd, terr := startQuickTunnel(ctx, ln.Addr().String()) + if terr != nil { + u.Warnf("tunnel failed (%v) — serving locally only at %s", terr, localAddr) + } else { + publicURL = turl + tunnel = tcmd + } + } + + st := researchState{Program: name, LocalAddr: localAddr, PublicURL: publicURL, OwnerToken: ownerToken} + if err := writeResearchState(cfg, st); err != nil { + u.Warnf("could not persist program state (approve/status from another shell may not work): %v", err) + } + + u.Successf("Research program %q published", name) + u.Infof("Public URL: %s", publicURL) + u.Infof("Metric: %s (%s, %s)", prog.Criteria.Metric, prog.Criteria.Direction, prog.Criteria.Accept) + u.Infof("Membership: %s", cmd.String("membership")) + u.Blank() + u.Bold("Workers join with:") + u.Printf(" python3 worker.py --kb %s --program %s --worker ", publicURL, name) + if cmd.String("membership") == server.MembershipInvite { + u.Dim("Approve each worker's printed code: obol research approve ") + } + u.Blank() + u.Dim("Ctrl-C to stop the program.") + + // Run until interrupted. + sigCtx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + defer stop() + <-sigCtx.Done() + + u.Info("Stopping …") + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = httpSrv.Shutdown(shutCtx) + if tunnel != nil && tunnel.Process != nil { + _ = tunnel.Process.Kill() + } + _ = removeResearchState(cfg, name) + return nil + }, + } +} + +func researchApproveCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "approve", + Usage: "Admit a worker to the program (the membership decision)", + ArgsUsage: "", + Flags: []cli.Flag{&cli.StringFlag{Name: "program", Usage: "Program name (defaults to the only running one)"}}, + Action: func(ctx context.Context, cmd *cli.Command) error { + u := getUI(cmd) + if cmd.NArg() != 1 { + return fmt.Errorf("user code required: obol research approve ") + } + st, err := loadResearchState(cfg, cmd.String("program")) + if err != nil { + return err + } + body, _ := json.Marshal(map[string]string{"user_code": cmd.Args().First()}) + resp, err := ownerPost(ctx, st, "/auth/device/approve", body) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("approve failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(b))) + } + u.Successf("Approved %s into %q", cmd.Args().First(), st.Program) + return nil + }, + } +} + +func researchStatusCommand(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "status", + Usage: "Show roster, results, champion, and payouts", + ArgsUsage: "[]", + Action: func(ctx context.Context, cmd *cli.Command) error { + u := getUI(cmd) + st, err := loadResearchState(cfg, cmd.Args().First()) + if err != nil { + return err + } + resp, err := ownerGet(ctx, st, "/status") + if err != nil { + return err + } + defer resp.Body.Close() + var out struct { + Program kb.Program `json:"program"` + Roster []string `json:"roster"` + Results []kb.Result `json:"results"` + Champion *kb.Result `json:"champion"` + Payouts map[string]float64 `json:"payouts"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return err + } + u.Bold(fmt.Sprintf("Program %s — %s (%s)", out.Program.ID, out.Program.Criteria.Metric, out.Program.Criteria.Direction)) + u.Infof("Members: %s", strings.Join(out.Roster, ", ")) + if out.Champion != nil { + u.Successf("Champion: %s = %.6f (worker %s)", out.Program.Criteria.Metric, out.Champion.Value, out.Champion.Worker) + } else { + u.Dim("Champion: none yet") + } + for _, r := range out.Results { + mark := "·" + if r.Champion { + mark = "★" + } else if r.Accepted { + mark = "+" + } + u.Printf(" %s #%d %-10s %.6f impact %.6f", mark, r.Seq, r.Worker, r.Value, r.Impact) + } + if len(out.Payouts) > 0 { + u.Blank() + u.Bold("Payouts (" + string(out.Program.Split) + "):") + for w, amt := range out.Payouts { + u.Printf(" %-10s %.6f %s", w, amt, out.Program.Token) + } + } + return nil + }, + } +} + +// --- helpers --- + +func programFromFlags(name string, cmd *cli.Command) (kb.Program, error) { + dir := kb.Direction(cmd.String("direction")) + if dir != kb.Minimize && dir != kb.Maximize { + return kb.Program{}, fmt.Errorf("--direction must be minimize or maximize") + } + acc := kb.AcceptMode(cmd.String("accept")) + if acc != kb.BeatsChampion && acc != kb.Threshold { + return kb.Program{}, fmt.Errorf("--accept must be beats-champion or threshold") + } + split := kb.SplitMode(cmd.String("split")) + if split != kb.ByImpact && split != kb.ChampionTakesAll { + return kb.Program{}, fmt.Errorf("--split must be by-impact or champion-takes-all") + } + p := kb.Program{ + ID: name, + Objective: cmd.String("objective"), + Criteria: kb.Criteria{Metric: cmd.String("metric"), Direction: dir, Accept: acc}, + Pool: cmd.Float("pool"), + Token: cmd.String("token"), + Network: cmd.String("network"), + Split: split, + } + if cmd.IsSet("threshold") { + t := cmd.Float("threshold") + p.Criteria.Threshold = &t + } + if cmd.IsSet("baseline") { + b := cmd.Float("baseline") + p.Baseline = &b + } + return p, nil +} + +func randomToken(prefix string) (string, error) { + b := make([]byte, 24) + if _, err := rand.Read(b); err != nil { + return "", err + } + return prefix + hex.EncodeToString(b), nil +} + +var trycloudflareRe = regexp.MustCompile(`https://[a-z0-9-]+\.trycloudflare\.com`) + +// startQuickTunnel launches `cloudflared tunnel --url http://addr` and +// returns the public trycloudflare URL once it appears on stderr. +func startQuickTunnel(ctx context.Context, addr string) (string, *exec.Cmd, error) { + bin, err := exec.LookPath("cloudflared") + if err != nil { + if _, statErr := os.Stat("/opt/homebrew/bin/cloudflared"); statErr == nil { + bin = "/opt/homebrew/bin/cloudflared" + } else { + return "", nil, fmt.Errorf("cloudflared not found") + } + } + c := exec.CommandContext(ctx, bin, "tunnel", "--no-autoupdate", "--url", "http://"+addr) + stderr, err := c.StderrPipe() + if err != nil { + return "", nil, err + } + if err := c.Start(); err != nil { + return "", nil, err + } + urlCh := make(chan string, 1) + go func() { + sc := bufio.NewScanner(stderr) + for sc.Scan() { + if m := trycloudflareRe.FindString(sc.Text()); m != "" { + urlCh <- m + // keep draining so the pipe doesn't block cloudflared + for sc.Scan() { + } + return + } + } + }() + select { + case u := <-urlCh: + return u, c, nil + case <-time.After(40 * time.Second): + _ = c.Process.Kill() + return "", nil, fmt.Errorf("timed out waiting for tunnel URL") + } +} + +func researchStateDir(cfg *config.Config) string { return filepath.Join(cfg.ConfigDir, "research") } + +func writeResearchState(cfg *config.Config, st researchState) error { + dir := researchStateDir(cfg) + if err := os.MkdirAll(dir, 0o700); err != nil { + return err + } + b, _ := json.MarshalIndent(st, "", " ") + return os.WriteFile(filepath.Join(dir, st.Program+".json"), b, 0o600) +} + +func removeResearchState(cfg *config.Config, name string) error { + return os.Remove(filepath.Join(researchStateDir(cfg), name+".json")) +} + +// loadResearchState reads the named program's state, or the only one if name +// is empty. +func loadResearchState(cfg *config.Config, name string) (researchState, error) { + dir := researchStateDir(cfg) + if name != "" { + return readResearchStateFile(filepath.Join(dir, name+".json")) + } + entries, err := os.ReadDir(dir) + if err != nil { + return researchState{}, fmt.Errorf("no running program (publish one first)") + } + var found []string + for _, e := range entries { + if strings.HasSuffix(e.Name(), ".json") { + found = append(found, e.Name()) + } + } + if len(found) == 0 { + return researchState{}, fmt.Errorf("no running program (publish one first)") + } + if len(found) > 1 { + return researchState{}, fmt.Errorf("multiple programs running — pass the name") + } + return readResearchStateFile(filepath.Join(dir, found[0])) +} + +func readResearchStateFile(path string) (researchState, error) { + b, err := os.ReadFile(path) + if err != nil { + return researchState{}, fmt.Errorf("read program state: %w", err) + } + var st researchState + if err := json.Unmarshal(b, &st); err != nil { + return researchState{}, err + } + return st, nil +} + +func ownerPost(ctx context.Context, st researchState, path string, body []byte) (*http.Response, error) { + req, _ := http.NewRequestWithContext(ctx, http.MethodPost, st.LocalAddr+path, strings.NewReader(string(body))) + req.Header.Set("Authorization", "Bearer "+st.OwnerToken) + req.Header.Set("Content-Type", "application/json") + return http.DefaultClient.Do(req) +} + +func ownerGet(ctx context.Context, st researchState, path string) (*http.Response, error) { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, st.LocalAddr+path, nil) + req.Header.Set("Authorization", "Bearer "+st.OwnerToken) + return http.DefaultClient.Do(req) +} diff --git a/internal/embed/skills/research-program/SKILL.md b/internal/embed/skills/research-program/SKILL.md new file mode 100644 index 00000000..b923f3b9 --- /dev/null +++ b/internal/embed/skills/research-program/SKILL.md @@ -0,0 +1,87 @@ +--- +name: research-program +description: Stand up a decentralized auto-research program on the Obol Stack — publish a research ID, admit worker runners over the open internet, collect hypotheses/results in a private collective knowledge base, and distribute rewards proportional to validated impact. Wraps `obol research` + the worker runner; true to karpathy/autoresearch. +--- + +# Research Program (decentralized auto-research) + +Publish a **research ID**, let **worker runners on any machine** join over the +open internet, have them run real experiments and post results to a **collective +knowledge base private to the group**, and pay out **proportional to validated +impact**. The owner is a pure coordinator — it never runs an experiment. + +This wraps two commands: `obol research` (owner) and `scripts/worker.py` (runner). + +## Declarative model (true to autoresearch) + +A program is essentially a `TASK.md` frontmatter: an **arbitrary metric**, a +**direction**, and a **KEEP rule**. Any domain lands without a schema change. + +``` +metric val_bpb # any string: val_bpb, auc, latency_ms, ΔΔG, … +direction minimize|maximize +accept beats-champion|threshold +split by-impact|champion-takes-all +membership open|invite +``` + +Operational policy (how a runner sets up GPUs, which hypotheses to try) is +off-chain — it lives in `program.md` and in the runner, exactly as +AutoScientists keeps `LAUNCH.md` off-chain. + +## 1. Owner — publish the program (on your machine) + +```bash +obol research publish nanogpt-valbpb \ + --objective "Drive nanoGPT val_bpb down" \ + --metric val_bpb --direction minimize --accept beats-champion \ + --baseline 1.20 --pool 100 --token OBOL --network base-sepolia \ + --membership invite --split by-impact +``` + +This starts the KB + membership server on your machine and opens a **Cloudflare +tunnel**, printing a public URL like `https://.trycloudflare.com`. Workers on +other machines reach the KB at that URL; every KB route is gated by a member +token (the device-auth flow below), so the program stays private to the group +while being reachable over the open internet. Runs in the foreground. + +## 2. Runner — join and contribute (on each GPU machine) + +```bash +python3 worker.py --kb https://.trycloudflare.com \ + --program nanogpt-valbpb --worker spark1 --time-budget 60 +``` + +The runner prints a **join code** and waits. Default experiment is the real +karpathy/autoresearch nanoGPT loop (`uv run train.py`, parsing `val_bpb:`); pass +`--experiment ": '>"` for any other task. The +runner needs the autoresearch repo prepared once (`uv run prepare.py`). + +## 3. Owner — admit the runner (the membership decision) + +```bash +obol research approve +``` + +Only the owner can approve, so the owner alone decides who joins. With +`--membership open`, runners are auto-admitted and this step is skipped. + +## 4. Owner — watch progress and settle + +```bash +obol research status nanogpt-valbpb +``` + +Shows the roster, every submitted result, the current champion, and the +impact-proportional payout split. First-verified-wins on duplicate +improvements; payout share ∝ each accepted result's validated metric gain. + +## What's private vs. public + +- **Private to the group** (token-gated): the KB — `/task`, `/champion`, + `/results`, `/status`. A request without a valid member token for THIS + program gets 401/403. +- **Public** (the secret is the device code, RFC 8628): `/auth/device/code`, + `/auth/device/token`. Owner-only: `/auth/device/approve`. + +Never expose the KB as an open public route — membership is the whole point. diff --git a/internal/embed/skills/research-program/scripts/worker.py b/internal/embed/skills/research-program/scripts/worker.py new file mode 100644 index 00000000..494a6b36 --- /dev/null +++ b/internal/embed/skills/research-program/scripts/worker.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +"""Auto-research worker — the runner side of an obol decentralized research program. + +Joins a program's collective knowledge base over the open internet (the +owner's Cloudflare URL), runs one real experiment (karpathy/autoresearch +nanoGPT by default), and posts its metric back. The KB decides KEEP/REJECT +and tracks the champion; rewards are settled by the owner. + +Stdlib only (urllib/json/subprocess) so it drops onto any runner with no +install. The membership flow is RFC 8628 device-auth: print a user code, the +owner approves it, we poll for a member token, then every KB call carries it. + +Usage: + python3 worker.py --kb --program --worker \\ + [--time-budget 60] [--repo ~/autoresearch] [--experiment ""] + +Without --experiment it runs the autoresearch baseline: + cd && TIME_BUDGET override && uv run train.py → parses 'val_bpb:'. +""" + +import argparse +import json +import os +import re +import subprocess +import sys +import time +import urllib.error +import urllib.request + + +def _post(url, token, body, timeout=120): + data = json.dumps(body).encode() + req = urllib.request.Request(url, data=data, method="POST") + req.add_header("Content-Type", "application/json") + if token: + req.add_header("Authorization", "Bearer " + token) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read().decode()) + + +def _get(url, token, timeout=60): + req = urllib.request.Request(url, method="GET") + if token: + req.add_header("Authorization", "Bearer " + token) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read().decode()) + + +def log(msg): + print(msg, file=sys.stderr, flush=True) + + +def join(kb, program, worker): + """Device-auth: get a code, wait for owner approval, return a member token.""" + grant = _post(kb + "/auth/device/code", None, {"worker": worker}) + user_code = grant["user_code"] + interval = max(2, int(grant.get("interval", 5))) + log("") + log("=" * 52) + log(" JOIN CODE for %s: %s" % (program, user_code)) + log(" Owner runs: obol research approve %s" % user_code) + log("=" * 52) + log("") + deadline = time.time() + int(grant.get("expires_in", 900)) + while time.time() < deadline: + res = _post(kb + "/auth/device/token", None, {"device_code": grant["device_code"]}) + if res.get("status") == "authorized": + log("Admitted to %s." % program) + return res["token"] + time.sleep(interval) + raise SystemExit("join timed out waiting for owner approval") + + +# Hardware adaptation applied to train.py before running. autoresearch ships +# a FlashAttention-3 attention path (flash_attn_func) that has no kernel image +# for some GPUs (e.g. NVIDIA GB10 / Blackwell sm_121). train.py is the file an +# autoresearch agent edits, so swapping that one call to PyTorch-native SDPA — +# which runs on any CUDA device — is a legitimate, in-framework adaptation. +_FA3_CALL = "y = fa3.flash_attn_func(q, k, v, causal=True, window_size=window_size)" +_SDPA_CALL = ("y = torch.nn.functional.scaled_dot_product_attention(" + "q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2), " + "is_causal=True, enable_gqa=True).transpose(1, 2)") + +# Model/batch fit: the default 124M model at DEVICE_BATCH_SIZE=128 (×2048 seq) +# OOM-kills on a memory-pressured GPU. Shrink to a small GPT at a small batch +# so eager training fits and finishes fast — train.py's own comment says +# "reduce if OOM". Still a real GPT producing a real val_bpb. +_TRAIN_SUBS = [ + (_FA3_CALL, _SDPA_CALL), + (" n_layer: int = 12", " n_layer: int = 4"), + (" n_embd: int = 768", " n_embd: int = 512"), + ("DEVICE_BATCH_SIZE = 128", "DEVICE_BATCH_SIZE = 8"), + # One micro-batch per optimizer step: the default 2**19 token batch means + # 32 grad-accum micro-steps per step (~35s/step eager on GB10). 2**14 = + # batch*seq, so grad_accum_steps=1 and a step is ~1s — train.py needs + # step>10 to stop, so this keeps the whole run to seconds, not minutes. + ("TOTAL_BATCH_SIZE = 2**19", "TOTAL_BATCH_SIZE = 2**14"), +] + +_PREP = r''' +import re, shutil +shutil.copy("train.py", "train.py.obolbak"); shutil.copy("prepare.py", "prepare.py.obolbak") +t = open("train.py").read() +for a, b in {subs!r}: + t = t.replace(a, b) +open("train.py", "w").write(t) +p = open("prepare.py").read() +p = re.sub(r"^TIME_BUDGET = .*", "TIME_BUDGET = {budget}", p, flags=re.M) +p = re.sub(r"^EVAL_TOKENS = .*", "EVAL_TOKENS = 131072 # shrunk for fast eager eval", p, flags=re.M) +open("prepare.py", "w").write(p) +''' + + +def run_autoresearch(repo, time_budget): + """Run the real karpathy/autoresearch training; return (val_bpb, tail). + + Adapts train.py for the local GPU (FA3 → SDPA), shrinks the fixed time + budget, runs eager (FA3's absence makes torch.compile tracing moot on + these devices), then restores the originals. + """ + repo = os.path.expanduser(repo) + if not os.path.isdir(repo): + raise SystemExit("autoresearch repo not found at %s" % repo) + env = dict(os.environ) + env["PATH"] = os.path.expanduser("~/.local/bin") + ":" + env.get("PATH", "") + # Run eager (no torch.compile). On bleeding-edge GPUs (NVIDIA GB10 / + # Blackwell sm_121a) Triton/ptxas can't yet assemble inductor kernels; + # eager has no Triton dependency and just runs. The cost of eager is a + # slow final eval over EVAL_TOKENS, which we shrink below so the run + # completes quickly — both are legitimate train.py-for-this-hardware edits. + env["TORCHDYNAMO_DISABLE"] = "1" + + prep = _PREP.format(subs=_TRAIN_SUBS, budget=int(time_budget)) + cmd = ( + "cd %s && python3 -c %s && " + "uv run --no-sync python train.py; " + "mv -f train.py.obolbak train.py 2>/dev/null; mv -f prepare.py.obolbak prepare.py 2>/dev/null || true" + % (repo, _shquote(prep)) + ) + log("Running autoresearch experiment (TIME_BUDGET=%ss, GB10-adapted) …" % time_budget) + p = subprocess.run(["bash", "-lc", cmd], env=env, capture_output=True, text=True) + out = (p.stdout or "") + "\n" + (p.stderr or "") + m = re.search(r"^val_bpb:\s*([0-9.]+)", out, re.MULTILINE) + if not m: + log(out[-2000:]) + raise SystemExit("could not parse val_bpb from train.py output") + return float(m.group(1)), out[-1500:] + + +def _shquote(s): + return "'" + s.replace("'", "'\\''") + "'" + + +def run_custom(experiment, metric): + """Run an arbitrary experiment shell command; parse ': '.""" + p = subprocess.run(["bash", "-lc", experiment], capture_output=True, text=True) + out = (p.stdout or "") + "\n" + (p.stderr or "") + m = re.search(r"^%s:\s*([0-9.eE+-]+)" % re.escape(metric), out, re.MULTILINE) + if not m: + log(out[-2000:]) + raise SystemExit("could not parse '%s:' from experiment output" % metric) + return float(m.group(1)), out[-1500:] + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--kb", required=True, help="owner KB base URL (Cloudflare)") + ap.add_argument("--program", required=True) + ap.add_argument("--worker", required=True) + ap.add_argument("--time-budget", type=int, default=60) + ap.add_argument("--repo", default="~/autoresearch") + ap.add_argument("--experiment", default="", help="custom experiment shell cmd (else autoresearch)") + args = ap.parse_args() + + kb = args.kb.rstrip("/") + + token = join(kb, args.program, args.worker) + + task = _get(kb + "/task", token) + metric = task["program"]["criteria"]["metric"] + champ = task.get("champion") + log("Task: optimize %s (%s). Current champion: %s" % ( + metric, task["program"]["criteria"]["direction"], + ("%.6f" % champ["value"]) if champ else "none")) + + if args.experiment: + value, tail = run_custom(args.experiment, metric) + else: + value, tail = run_autoresearch(args.repo, args.time_budget) + log("Experiment %s = %.6f" % (metric, value)) + + res = _post(kb + "/results", token, {"worker": args.worker, "value": value, "output": tail}) + verdict = "KEPT (new champion)" if res.get("champion") else ("ACCEPTED" if res.get("accepted") else "rejected") + log("Submitted: %s = %.6f → %s (impact %.6f)" % (metric, value, verdict, res.get("impact", 0.0))) + # Machine-readable final line on stdout. + print(json.dumps({"worker": args.worker, "metric": metric, "value": value, + "accepted": res.get("accepted"), "champion": res.get("champion"), + "impact": res.get("impact")})) + + +if __name__ == "__main__": + try: + main() + except urllib.error.HTTPError as e: + raise SystemExit("HTTP %s: %s" % (e.code, e.read().decode()[:300])) diff --git a/internal/research/groupauth/groupauth.go b/internal/research/groupauth/groupauth.go new file mode 100644 index 00000000..ebc7863e --- /dev/null +++ b/internal/research/groupauth/groupauth.go @@ -0,0 +1,273 @@ +// Package groupauth is the membership/OAuth layer for decentralized +// auto-research groups: it issues and verifies the tokens that gate a +// research program's PRIVATE collective knowledge base. +// +// It is an RFC 8628-style device-authorization flow adapted from the +// Darkbloom / d-inference coordinator (coordinator/api/device_auth.go) — +// same shape, research-group semantics: +// +// 1. A worker agent calls RequestCode() → device_code + user_code +// 2. The PROGRAM OWNER approves the user_code → links it to the group +// (this is the membership decision — "coordination at the OAuth level") +// 3. The worker polls Poll(device_code) → a member token +// 4. The worker presents the token to the program's → read/write the +// private knowledge-base service group-private KB +// +// The store is in-memory and dependency-free on purpose: a research +// program is a small, single-coordinator group, and keeping this package +// free of cluster/store deps lets the serviceoffer-controller embed it +// (or a sidecar mount it) without pulling the whole coordinator in. Tokens +// are persisted only as SHA-256 hashes; the raw token is returned once. +package groupauth + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "errors" + "math/big" + "strings" + "sync" + "time" +) + +const ( + // CodeExpiry is how long an unused device code stays valid. + CodeExpiry = 15 * time.Minute + // PollInterval is the minimum seconds a worker should wait between polls. + PollInterval = 5 + // tokenPrefix namespaces member tokens (mirrors the d-inference prefix + // convention so tokens are self-identifying in logs). + tokenPrefix = "obol-research-mt-" + // userCharset excludes ambiguous glyphs (0/O, 1/I/L). + userCharset = "ABCDEFGHJKMNPQRSTUVWXYZ23456789" +) + +// Status values for a device code. +const ( + StatusPending = "pending" + StatusApproved = "approved" +) + +var ( + // ErrNotFound is returned when a device code or token is unknown. + ErrNotFound = errors.New("groupauth: not found") + // ErrExpired is returned when a device code has passed its expiry. + ErrExpired = errors.New("groupauth: device code expired") + // ErrAlreadyUsed is returned when approving a non-pending code. + ErrAlreadyUsed = errors.New("groupauth: code already used") +) + +// deviceCode is an in-flight authorization request. +type deviceCode struct { + DeviceCode string + UserCode string + Status string + GroupID string // the ResearchProgram id, set on approval + WorkerID string // optional self-declared worker label + ExpiresAt time.Time +} + +// memberToken is an issued, hashed membership credential. +type memberToken struct { + TokenHash string + GroupID string + Label string + IssuedAt time.Time + Active bool +} + +// CodeGrant is returned to a worker starting a login. +type CodeGrant struct { + DeviceCode string `json:"device_code"` + UserCode string `json:"user_code"` + ExpiresIn int `json:"expires_in"` + Interval int `json:"interval"` +} + +// PollResult is returned while a worker polls for approval. +type PollResult struct { + // Status is "authorization_pending" until approved, then "authorized". + Status string `json:"status"` + // Token is set only once, when Status first becomes "authorized". + Token string `json:"token,omitempty"` + GroupID string `json:"group_id,omitempty"` +} + +// Authority issues and verifies a single research group's membership +// tokens. One Authority per ResearchProgram; safe for concurrent use. +type Authority struct { + mu sync.Mutex + byDevice map[string]*deviceCode + byUser map[string]*deviceCode + tokens map[string]*memberToken // keyed by token hash + now func() time.Time // injectable clock for tests +} + +// New returns an empty Authority. +func New() *Authority { + return &Authority{ + byDevice: map[string]*deviceCode{}, + byUser: map[string]*deviceCode{}, + tokens: map[string]*memberToken{}, + now: time.Now, + } +} + +// RequestCode starts a device login for a worker. workerID is an optional +// self-declared label (e.g. an address or agent name) recorded for the +// owner's approval decision; it is NOT trusted as identity. +func (a *Authority) RequestCode(workerID string) (CodeGrant, error) { + a.mu.Lock() + defer a.mu.Unlock() + + dcVal, err := randomHex(32) + if err != nil { + return CodeGrant{}, err + } + userCode, err := generateUserCode() + if err != nil { + return CodeGrant{}, err + } + // Resolve the rare user-code collision before storing. + for i := 0; i < 5; i++ { + if _, taken := a.byUser[userCode]; !taken { + break + } + if userCode, err = generateUserCode(); err != nil { + return CodeGrant{}, err + } + } + + dc := &deviceCode{ + DeviceCode: dcVal, + UserCode: userCode, + Status: StatusPending, + WorkerID: strings.TrimSpace(workerID), + ExpiresAt: a.now().Add(CodeExpiry), + } + a.byDevice[dcVal] = dc + a.byUser[userCode] = dc + + return CodeGrant{ + DeviceCode: dcVal, + UserCode: userCode, + ExpiresIn: int(CodeExpiry.Seconds()), + Interval: PollInterval, + }, nil +} + +// Approve is the membership decision: the program owner links a pending +// user_code to the group. Only the owner calls this (it is the gate that +// makes the knowledge base private to the group). user_code matching is +// case-insensitive and space-trimmed. +func (a *Authority) Approve(groupID, userCode string) error { + a.mu.Lock() + defer a.mu.Unlock() + + dc, ok := a.byUser[normalizeUserCode(userCode)] + if !ok { + return ErrNotFound + } + if a.now().After(dc.ExpiresAt) { + return ErrExpired + } + if dc.Status != StatusPending { + return ErrAlreadyUsed + } + dc.Status = StatusApproved + dc.GroupID = strings.TrimSpace(groupID) + return nil +} + +// Poll returns the authorization state for a device code. On the first +// poll after approval it mints and returns the raw member token exactly +// once; only its SHA-256 hash is retained. +func (a *Authority) Poll(deviceCode string) (PollResult, error) { + a.mu.Lock() + defer a.mu.Unlock() + + dc, ok := a.byDevice[deviceCode] + if !ok { + return PollResult{}, ErrNotFound + } + if a.now().After(dc.ExpiresAt) { + return PollResult{}, ErrExpired + } + + if dc.Status != StatusApproved { + return PollResult{Status: "authorization_pending"}, nil + } + + raw, err := randomHex(32) + if err != nil { + return PollResult{}, err + } + rawToken := tokenPrefix + raw + a.tokens[hashToken(rawToken)] = &memberToken{ + TokenHash: hashToken(rawToken), + GroupID: dc.GroupID, + Label: "device-" + dc.UserCode, + IssuedAt: a.now(), + Active: true, + } + // Consume the code so the token is issued once. + delete(a.byDevice, dc.DeviceCode) + delete(a.byUser, dc.UserCode) + + return PollResult{Status: "authorized", Token: rawToken, GroupID: dc.GroupID}, nil +} + +// VerifyToken reports whether rawToken is an active member token and, if +// so, the group it grants access to. This is what the private knowledge- +// base service calls on each request. +func (a *Authority) VerifyToken(rawToken string) (groupID string, ok bool) { + a.mu.Lock() + defer a.mu.Unlock() + + t, found := a.tokens[hashToken(rawToken)] + if !found || !t.Active { + return "", false + } + return t.GroupID, true +} + +// Revoke deactivates a member token (owner removing a worker from the group). +func (a *Authority) Revoke(rawToken string) { + a.mu.Lock() + defer a.mu.Unlock() + if t, found := a.tokens[hashToken(rawToken)]; found { + t.Active = false + } +} + +// --- helpers --- + +func generateUserCode() (string, error) { + code := make([]byte, 8) + for i := range code { + n, err := rand.Int(rand.Reader, big.NewInt(int64(len(userCharset)))) + if err != nil { + return "", err + } + code[i] = userCharset[n.Int64()] + } + return string(code[:4]) + "-" + string(code[4:]), nil +} + +func normalizeUserCode(s string) string { + return strings.ToUpper(strings.TrimSpace(s)) +} + +func randomHex(n int) (string, error) { + b := make([]byte, n) + if _, err := rand.Read(b); err != nil { + return "", err + } + return hex.EncodeToString(b), nil +} + +func hashToken(s string) string { + h := sha256.Sum256([]byte(s)) + return hex.EncodeToString(h[:]) +} diff --git a/internal/research/groupauth/groupauth_test.go b/internal/research/groupauth/groupauth_test.go new file mode 100644 index 00000000..421b1cca --- /dev/null +++ b/internal/research/groupauth/groupauth_test.go @@ -0,0 +1,98 @@ +package groupauth + +import ( + "strings" + "testing" + "time" +) + +func TestDeviceFlow_CodeApproveTokenVerify(t *testing.T) { + a := New() + + grant, err := a.RequestCode("0xworker") + if err != nil { + t.Fatalf("RequestCode: %v", err) + } + if grant.DeviceCode == "" || grant.UserCode == "" { + t.Fatal("empty grant") + } + if grant.Interval != PollInterval || grant.ExpiresIn <= 0 { + t.Errorf("grant metadata = %+v", grant) + } + + // Before approval: pending, no token. + if r, err := a.Poll(grant.DeviceCode); err != nil || r.Status != "authorization_pending" || r.Token != "" { + t.Fatalf("pre-approval poll = %+v err=%v", r, err) + } + + // Owner approves the user_code (case/space-insensitive). + if err := a.Approve("nanogpt-valbpb", " "+strings.ToLower(grant.UserCode)+" "); err != nil { + t.Fatalf("Approve: %v", err) + } + + // First post-approval poll mints the token. + res, err := a.Poll(grant.DeviceCode) + if err != nil { + t.Fatalf("Poll after approve: %v", err) + } + if res.Status != "authorized" || res.GroupID != "nanogpt-valbpb" { + t.Fatalf("poll result = %+v", res) + } + if !strings.HasPrefix(res.Token, tokenPrefix) { + t.Errorf("token = %q, want %s prefix", res.Token, tokenPrefix) + } + + // The token verifies and names the group. + if gid, ok := a.VerifyToken(res.Token); !ok || gid != "nanogpt-valbpb" { + t.Errorf("VerifyToken = %q,%v", gid, ok) + } + + // Token is single-issue: the code is consumed. + if _, err := a.Poll(grant.DeviceCode); err != ErrNotFound { + t.Errorf("second poll err = %v, want ErrNotFound", err) + } + + // Revocation removes access. + a.Revoke(res.Token) + if _, ok := a.VerifyToken(res.Token); ok { + t.Error("revoked token still verifies") + } +} + +func TestApprove_Errors(t *testing.T) { + a := New() + if err := a.Approve("g", "NOPE-NOPE"); err != ErrNotFound { + t.Errorf("unknown code err = %v, want ErrNotFound", err) + } + + grant, _ := a.RequestCode("") + if err := a.Approve("g", grant.UserCode); err != nil { + t.Fatalf("first approve: %v", err) + } + if err := a.Approve("g", grant.UserCode); err != ErrAlreadyUsed { + t.Errorf("double approve err = %v, want ErrAlreadyUsed", err) + } +} + +func TestExpiry(t *testing.T) { + a := New() + base := time.Unix(1_700_000_000, 0) + a.now = func() time.Time { return base } + + grant, _ := a.RequestCode("w") + a.now = func() time.Time { return base.Add(CodeExpiry + time.Second) } + + if err := a.Approve("g", grant.UserCode); err != ErrExpired { + t.Errorf("approve expired err = %v, want ErrExpired", err) + } + if _, err := a.Poll(grant.DeviceCode); err != ErrExpired { + t.Errorf("poll expired err = %v, want ErrExpired", err) + } +} + +func TestVerify_UnknownToken(t *testing.T) { + a := New() + if _, ok := a.VerifyToken("obol-research-mt-deadbeef"); ok { + t.Error("unknown token must not verify") + } +} diff --git a/internal/research/kb/kb.go b/internal/research/kb/kb.go new file mode 100644 index 00000000..adf0690d --- /dev/null +++ b/internal/research/kb/kb.go @@ -0,0 +1,288 @@ +// Package kb is the collective knowledge base for a decentralized +// auto-research program: the workspace AutoScientists agents coordinate +// through (results log, champion, roster), plus the acceptance rule +// (autoresearch's KEEP) and the impact-proportional payout split. +// +// It is pure and concurrency-safe; the HTTP surface (membership gating, +// device-auth) lives in internal/research/server. +package kb + +import ( + "errors" + "math" + "sort" + "sync" + "time" +) + +// Direction is whether lower or higher metric values are better. +type Direction string + +const ( + Minimize Direction = "minimize" + Maximize Direction = "maximize" +) + +// AcceptMode is the KEEP rule for a submitted result. +type AcceptMode string + +const ( + // BeatsChampion accepts a result that improves on the current champion + // (or the published Baseline when there is no champion yet). + BeatsChampion AcceptMode = "beats-champion" + // Threshold accepts any result that clears Criteria.Threshold. + Threshold AcceptMode = "threshold" +) + +// SplitMode decides how the reward pool is divided at close. +type SplitMode string + +const ( + // ByImpact divides the pool proportional to each accepted result's + // validated impact (the metric improvement it delivered). + ByImpact SplitMode = "by-impact" + // ChampionTakesAll pays the whole pool to the final champion's worker. + ChampionTakesAll SplitMode = "champion-takes-all" +) + +// Criteria mirrors AutoScientists' TASK.md frontmatter: an arbitrary +// metric name + a direction + the KEEP rule. Nothing domain-specific. +type Criteria struct { + Metric string `json:"metric"` + Direction Direction `json:"direction"` + Accept AcceptMode `json:"accept"` + Threshold *float64 `json:"threshold,omitempty"` +} + +// Program is the published research ID. +type Program struct { + ID string `json:"id"` + Objective string `json:"objective"` + Criteria Criteria `json:"criteria"` + Baseline *float64 `json:"baseline,omitempty"` // reference metric; first improvement's impact is measured against it + Pool float64 `json:"pool"` + Token string `json:"token"` + Network string `json:"network"` + Split SplitMode `json:"split"` +} + +// Result is one submitted experiment outcome. +type Result struct { + Seq int `json:"seq"` + Worker string `json:"worker"` + Value float64 `json:"value"` + Output string `json:"output,omitempty"` // raw train.py tail, for audit + At time.Time `json:"at"` + Accepted bool `json:"accepted"` + Impact float64 `json:"impact"` + Champion bool `json:"champion"` // true if this result became the champion +} + +// KB holds one program's collective state. +type KB struct { + mu sync.Mutex + prog Program + results []Result + champion *Result + roster map[string]time.Time + seq int + now func() time.Time +} + +// New returns a KB for a program. +func New(p Program) *KB { + return &KB{prog: p, roster: map[string]time.Time{}, now: time.Now} +} + +// Program returns the published program (immutable copy). +func (k *KB) Program() Program { + k.mu.Lock() + defer k.mu.Unlock() + return k.prog +} + +// Join records a worker in the roster (idempotent). +func (k *KB) Join(worker string) { + k.mu.Lock() + defer k.mu.Unlock() + if _, ok := k.roster[worker]; !ok { + k.roster[worker] = k.now() + } +} + +// Roster returns the joined workers and when they joined. +func (k *KB) Roster() map[string]time.Time { + k.mu.Lock() + defer k.mu.Unlock() + out := make(map[string]time.Time, len(k.roster)) + for w, t := range k.roster { + out[w] = t + } + return out +} + +// Champion returns a copy of the current champion result, or nil. +func (k *KB) Champion() *Result { + k.mu.Lock() + defer k.mu.Unlock() + if k.champion == nil { + return nil + } + c := *k.champion + return &c +} + +// Results returns the full results log in submission order. +func (k *KB) Results() []Result { + k.mu.Lock() + defer k.mu.Unlock() + out := make([]Result, len(k.results)) + copy(out, k.results) + return out +} + +// Submit applies the KEEP rule to a worker's result: it records the result, +// decides acceptance + impact, and promotes the champion if it improved. +// Submission is serialized, so the FIRST result to beat the current best +// wins it (first-verified-wins). A worker that is not in the roster is +// auto-joined. +func (k *KB) Submit(worker string, value float64, output string) (Result, error) { + if worker == "" { + return Result{}, errors.New("kb: worker required") + } + if math.IsNaN(value) || math.IsInf(value, 0) { + return Result{}, errors.New("kb: value must be finite") + } + + k.mu.Lock() + defer k.mu.Unlock() + + if _, ok := k.roster[worker]; !ok { + k.roster[worker] = k.now() + } + + k.seq++ + r := Result{Seq: k.seq, Worker: worker, Value: value, Output: output, At: k.now()} + + switch k.prog.Criteria.Accept { + case Threshold: + if k.prog.Criteria.Threshold != nil { + th := *k.prog.Criteria.Threshold + if k.better(value, th) || value == th { + r.Accepted = true + r.Impact = k.gain(th, value) + } + } + // The champion under threshold mode is still the best-so-far. + if r.Accepted && (k.champion == nil || k.better(value, k.champion.Value)) { + r.Champion = true + } + default: // BeatsChampion + ref, hasRef := k.bestRef() + if !hasRef || k.better(value, ref) { + r.Accepted = true + r.Champion = true + if hasRef { + r.Impact = k.gain(ref, value) + } else { + // First result with no Baseline sets the baseline only. + r.Impact = 0 + } + } + } + + if r.Champion { + c := r + k.champion = &c + } + k.results = append(k.results, r) + return r, nil +} + +// Payouts splits the reward pool across workers per the program's SplitMode. +// Returns worker -> amount (rounded to 6 dp). Empty when nothing is owed. +func (k *KB) Payouts() map[string]float64 { + k.mu.Lock() + defer k.mu.Unlock() + + out := map[string]float64{} + if k.prog.Pool <= 0 { + return out + } + + if k.prog.Split == ChampionTakesAll { + if k.champion != nil { + out[k.champion.Worker] = round6(k.prog.Pool) + } + return out + } + + // ByImpact (default): proportional to accepted impact. + total := 0.0 + per := map[string]float64{} + for _, r := range k.results { + if r.Accepted && r.Impact > 0 { + per[r.Worker] += r.Impact + total += r.Impact + } + } + if total <= 0 { + return out + } + for w, imp := range per { + out[w] = round6(k.prog.Pool * imp / total) + } + return out +} + +// --- helpers --- + +// better reports whether a is strictly better than b under the direction. +func (k *KB) better(a, b float64) bool { + if k.prog.Criteria.Direction == Maximize { + return a > b + } + return a < b +} + +// gain is the non-negative improvement of value over ref under the +// program's direction (ref-value when minimizing, value-ref when maximizing). +func (k *KB) gain(ref, value float64) float64 { + var d float64 + if k.prog.Criteria.Direction == Maximize { + d = value - ref + } else { + d = ref - value + } + if d < 0 { + d = 0 + } + return d +} + +// bestRef returns the current reference to beat (champion value, else +// Baseline) and whether one exists. +func (k *KB) bestRef() (float64, bool) { + if k.champion != nil { + return k.champion.Value, true + } + if k.prog.Baseline != nil { + return *k.prog.Baseline, true + } + return 0, false +} + +// Sorted snapshot helper for stable status output. +func (k *KB) RosterSorted() []string { + r := k.Roster() + ws := make([]string, 0, len(r)) + for w := range r { + ws = append(ws, w) + } + sort.Strings(ws) + return ws +} + +func round6(f float64) float64 { + return math.Round(f*1e6) / 1e6 +} diff --git a/internal/research/kb/kb_test.go b/internal/research/kb/kb_test.go new file mode 100644 index 00000000..a6985c92 --- /dev/null +++ b/internal/research/kb/kb_test.go @@ -0,0 +1,129 @@ +package kb + +import ( + "math" + "testing" +) + +func f(v float64) *float64 { return &v } + +func minimizeProg(split SplitMode, baseline *float64) Program { + return Program{ + ID: "nanogpt-valbpb", + Criteria: Criteria{Metric: "val_bpb", Direction: Minimize, Accept: BeatsChampion}, + Baseline: baseline, + Pool: 100, + Split: split, + } +} + +func TestSubmit_BeatsChampion_Minimize(t *testing.T) { + k := New(minimizeProg(ByImpact, f(1.20))) // baseline val_bpb 1.20 + + // spark1 improves on baseline → accepted, champion, impact 1.20-1.10. + r1, err := k.Submit("spark1", 1.10, "") + if err != nil { + t.Fatal(err) + } + if !r1.Accepted || !r1.Champion || math.Abs(r1.Impact-0.10) > 1e-9 { + t.Fatalf("r1 = %+v, want accepted champion impact 0.10", r1) + } + + // spark2 improves further → accepted, new champion, impact 1.10-1.05. + r2, _ := k.Submit("spark2", 1.05, "") + if !r2.Accepted || !r2.Champion || math.Abs(r2.Impact-0.05) > 1e-9 { + t.Fatalf("r2 = %+v, want accepted champion impact 0.05", r2) + } + + // spark1 submits a worse value → rejected, not champion, no impact. + r3, _ := k.Submit("spark1", 1.30, "") + if r3.Accepted || r3.Champion || r3.Impact != 0 { + t.Fatalf("r3 = %+v, want rejected", r3) + } + + if c := k.Champion(); c == nil || c.Worker != "spark2" || c.Value != 1.05 { + t.Fatalf("champion = %+v, want spark2 @1.05", c) + } + + // By-impact payout: spark1 0.10, spark2 0.05 → 2:1 of the 100 pool. + pay := k.Payouts() + if math.Abs(pay["spark1"]-66.666667) > 1e-3 || math.Abs(pay["spark2"]-33.333333) > 1e-3 { + t.Fatalf("payouts = %+v, want ~66.67/33.33", pay) + } +} + +func TestSubmit_FirstVerifiedWins(t *testing.T) { + k := New(minimizeProg(ByImpact, f(1.20))) + // Two workers submit the SAME improvement; first one in wins the champion. + a, _ := k.Submit("spark1", 1.10, "") + b, _ := k.Submit("spark2", 1.10, "") + if !a.Champion { + t.Error("first identical submission must take the champion") + } + if b.Accepted || b.Champion { + t.Errorf("second identical submission must not beat the champion: %+v", b) + } +} + +func TestSubmit_FirstResultNoBaselineSetsBaseline(t *testing.T) { + k := New(minimizeProg(ByImpact, nil)) // no baseline + r, _ := k.Submit("spark1", 1.10, "") + if !r.Accepted || !r.Champion || r.Impact != 0 { + t.Fatalf("first result w/o baseline = %+v, want champion impact 0", r) + } + // No positive impact yet → no payouts. + if len(k.Payouts()) != 0 { + t.Errorf("payouts before any improvement = %+v, want empty", k.Payouts()) + } +} + +func TestSubmit_ThresholdMode(t *testing.T) { + p := minimizeProg(ByImpact, nil) + p.Criteria.Accept = Threshold + p.Criteria.Threshold = f(1.00) + k := New(p) + + miss, _ := k.Submit("spark1", 1.05, "") // above threshold → reject + if miss.Accepted { + t.Error("1.05 must miss threshold 1.00 (minimize)") + } + hit, _ := k.Submit("spark2", 0.90, "") // clears threshold → accept, impact 0.10 + if !hit.Accepted || math.Abs(hit.Impact-0.10) > 1e-9 { + t.Fatalf("hit = %+v, want accepted impact 0.10", hit) + } +} + +func TestSubmit_MaximizeDirection(t *testing.T) { + p := minimizeProg(ByImpact, f(0.80)) + p.Criteria.Direction = Maximize + p.Criteria.Metric = "auc" + k := New(p) + r, _ := k.Submit("spark1", 0.90, "") // higher is better → +0.10 + if !r.Accepted || math.Abs(r.Impact-0.10) > 1e-9 { + t.Fatalf("maximize r = %+v, want impact 0.10", r) + } + worse, _ := k.Submit("spark2", 0.85, "") + if worse.Accepted { + t.Error("0.85 < champion 0.90 must be rejected under maximize") + } +} + +func TestPayouts_ChampionTakesAll(t *testing.T) { + k := New(minimizeProg(ChampionTakesAll, f(1.20))) + k.Submit("spark1", 1.10, "") + k.Submit("spark2", 1.05, "") + pay := k.Payouts() + if pay["spark2"] != 100 || len(pay) != 1 { + t.Fatalf("champion-takes-all = %+v, want spark2:100", pay) + } +} + +func TestSubmit_RejectsNonFinite(t *testing.T) { + k := New(minimizeProg(ByImpact, nil)) + if _, err := k.Submit("spark1", math.Inf(1), ""); err == nil { + t.Error("Inf value must be rejected") + } + if _, err := k.Submit("", 1.0, ""); err == nil { + t.Error("empty worker must be rejected") + } +} diff --git a/internal/research/server/server.go b/internal/research/server/server.go new file mode 100644 index 00000000..9da23241 --- /dev/null +++ b/internal/research/server/server.go @@ -0,0 +1,244 @@ +// Package server is the owner-hosted HTTP surface for a decentralized +// auto-research program: the device-auth (membership) endpoints plus the +// membership-gated knowledge base. One Server hosts one Program; the owner +// runs it on their machine and exposes it over a Cloudflare tunnel so +// workers on other obol-stacks reach it over the open internet — every KB +// route gated by a groupauth member token, never an open public route. +package server + +import ( + "crypto/subtle" + "encoding/json" + "log/slog" + "net/http" + "strings" + + "github.com/ObolNetwork/obol-stack/internal/research/groupauth" + "github.com/ObolNetwork/obol-stack/internal/research/kb" +) + +// Membership modes. +const ( + MembershipOpen = "open" // any worker is auto-approved on login + MembershipInvite = "invite" // the owner must approve each worker +) + +// Server hosts one program. +type Server struct { + prog kb.Program + membership string + owner string // owner admin token (gates approve/status-admin) + auth *groupauth.Authority + store *kb.KB + log *slog.Logger +} + +// New builds a Server for program p. ownerToken gates owner-only routes +// (approve). membership is MembershipOpen or MembershipInvite. +func New(p kb.Program, membership, ownerToken string, log *slog.Logger) *Server { + if log == nil { + log = slog.Default() + } + if membership == "" { + membership = MembershipInvite + } + return &Server{ + prog: p, + membership: membership, + owner: ownerToken, + auth: groupauth.New(), + store: kb.New(p), + log: log, + } +} + +// KB exposes the underlying store (for the CLI's local status/close). +func (s *Server) KB() *kb.KB { return s.store } + +// Approve links a pending user_code to the group (owner action). Exposed +// so the owner CLI can approve locally without a round-trip token. +func (s *Server) Approve(userCode string) error { + return s.auth.Approve(s.prog.ID, userCode) +} + +// Handler returns the HTTP mux. +func (s *Server) Handler() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) }) + + // Device-auth (public — the device_code is the secret, RFC 8628). + mux.HandleFunc("POST /auth/device/code", s.handleDeviceCode) + mux.HandleFunc("POST /auth/device/token", s.handleDeviceToken) + // Owner-only approval. + mux.HandleFunc("POST /auth/device/approve", s.ownerOnly(s.handleApprove)) + + // Membership-gated KB. + mux.HandleFunc("GET /task", s.member(s.handleTask)) + mux.HandleFunc("GET /champion", s.member(s.handleChampion)) + mux.HandleFunc("POST /results", s.member(s.handleResults)) + mux.HandleFunc("GET /status", s.member(s.handleStatus)) + return mux +} + +// --- device-auth handlers --- + +func (s *Server) handleDeviceCode(w http.ResponseWriter, r *http.Request) { + var body struct { + Worker string `json:"worker"` + } + _ = json.NewDecoder(r.Body).Decode(&body) + + grant, err := s.auth.RequestCode(body.Worker) + if err != nil { + writeErr(w, http.StatusInternalServerError, "server_error", "failed to create code") + return + } + // Open membership: auto-approve so the worker is admitted without the + // owner acting (the program chose to be public-join). + if s.membership == MembershipOpen { + _ = s.auth.Approve(s.prog.ID, grant.UserCode) + } + writeJSON(w, http.StatusOK, grant) +} + +func (s *Server) handleDeviceToken(w http.ResponseWriter, r *http.Request) { + var body struct { + DeviceCode string `json:"device_code"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.DeviceCode == "" { + writeErr(w, http.StatusBadRequest, "invalid_request", "device_code required") + return + } + res, err := s.auth.Poll(body.DeviceCode) + switch err { + case nil: + // The roster is populated authoritatively on Submit (the token does + // not carry the worker's self-declared id). + writeJSON(w, http.StatusOK, res) + case groupauth.ErrExpired: + writeErr(w, http.StatusGone, "expired_token", "device code expired") + default: + writeErr(w, http.StatusNotFound, "invalid_grant", "device code not found") + } +} + +func (s *Server) handleApprove(w http.ResponseWriter, r *http.Request) { + var body struct { + UserCode string `json:"user_code"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.UserCode == "" { + writeErr(w, http.StatusBadRequest, "invalid_request", "user_code required") + return + } + switch err := s.auth.Approve(s.prog.ID, body.UserCode); err { + case nil: + writeJSON(w, http.StatusOK, map[string]string{"status": "approved"}) + case groupauth.ErrExpired: + writeErr(w, http.StatusGone, "expired_code", "code expired") + case groupauth.ErrAlreadyUsed: + writeErr(w, http.StatusConflict, "already_used", "code already used") + default: + writeErr(w, http.StatusNotFound, "invalid_code", "code not found") + } +} + +// --- KB handlers (member-gated) --- + +func (s *Server) handleTask(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "program": s.store.Program(), + "champion": s.store.Champion(), + }) +} + +func (s *Server) handleChampion(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{"champion": s.store.Champion()}) +} + +func (s *Server) handleResults(w http.ResponseWriter, r *http.Request) { + var body struct { + Worker string `json:"worker"` + Value float64 `json:"value"` + Output string `json:"output"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeErr(w, http.StatusBadRequest, "invalid_request", "bad result body") + return + } + res, err := s.store.Submit(body.Worker, body.Value, body.Output) + if err != nil { + writeErr(w, http.StatusBadRequest, "invalid_result", err.Error()) + return + } + s.log.Info("result submitted", + "worker", body.Worker, "value", body.Value, + "accepted", res.Accepted, "champion", res.Champion, "impact", res.Impact) + writeJSON(w, http.StatusOK, res) +} + +func (s *Server) handleStatus(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "program": s.store.Program(), + "roster": s.store.RosterSorted(), + "results": s.store.Results(), + "champion": s.store.Champion(), + "payouts": s.store.Payouts(), + }) +} + +// --- middleware --- + +// member gates a handler on a valid member token for THIS program's group. +// The owner admin token is also accepted (the owner is a superuser, so the +// owner CLI can read status without minting itself a member token). +func (s *Server) member(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + tok := bearer(r) + if tok == "" { + writeErr(w, http.StatusUnauthorized, "auth_required", "member token required") + return + } + if s.owner != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(s.owner)) == 1 { + next(w, r) + return + } + gid, ok := s.auth.VerifyToken(tok) + if !ok || gid != s.prog.ID { + writeErr(w, http.StatusForbidden, "not_a_member", "token is not a member of this program") + return + } + next(w, r) + } +} + +// ownerOnly gates a handler on the owner admin token (constant-time). +func (s *Server) ownerOnly(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + tok := bearer(r) + if s.owner == "" || subtle.ConstantTimeCompare([]byte(tok), []byte(s.owner)) != 1 { + writeErr(w, http.StatusUnauthorized, "owner_required", "owner token required") + return + } + next(w, r) + } +} + +// --- helpers --- + +func bearer(r *http.Request) string { + h := r.Header.Get("Authorization") + if v, ok := strings.CutPrefix(h, "Bearer "); ok { + return strings.TrimSpace(v) + } + return "" +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func writeErr(w http.ResponseWriter, code int, kind, msg string) { + writeJSON(w, code, map[string]string{"error": kind, "message": msg}) +} diff --git a/internal/research/server/server_test.go b/internal/research/server/server_test.go new file mode 100644 index 00000000..bd7d60aa --- /dev/null +++ b/internal/research/server/server_test.go @@ -0,0 +1,118 @@ +package server + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/ObolNetwork/obol-stack/internal/research/kb" +) + +func testProgram() kb.Program { + base := 1.20 + return kb.Program{ + ID: "nanogpt-valbpb", + Criteria: kb.Criteria{Metric: "val_bpb", Direction: kb.Minimize, Accept: kb.BeatsChampion}, + Baseline: &base, + Pool: 100, Token: "OBOL", Network: "base-sepolia", Split: kb.ByImpact, + } +} + +func do(t *testing.T, h http.Handler, method, path, token string, body any) (*httptest.ResponseRecorder, map[string]any) { + t.Helper() + var r *http.Request + if body != nil { + b, _ := json.Marshal(body) + r = httptest.NewRequest(method, path, bytes.NewReader(b)) + } else { + r = httptest.NewRequest(method, path, nil) + } + if token != "" { + r.Header.Set("Authorization", "Bearer "+token) + } + w := httptest.NewRecorder() + h.ServeHTTP(w, r) + var out map[string]any + _ = json.Unmarshal(w.Body.Bytes(), &out) + return w, out +} + +func TestServer_InviteFlow_EndToEnd(t *testing.T) { + s := New(testProgram(), MembershipInvite, "owner-secret", nil) + h := s.Handler() + + // KB is gated before membership. + if w, _ := do(t, h, "GET", "/task", "", nil); w.Code != http.StatusUnauthorized { + t.Fatalf("ungated /task = %d, want 401", w.Code) + } + + // Worker requests a device code. + _, code := do(t, h, "POST", "/auth/device/code", "", map[string]string{"worker": "spark1"}) + deviceCode, _ := code["device_code"].(string) + userCode, _ := code["user_code"].(string) + if deviceCode == "" || userCode == "" { + t.Fatalf("device code grant = %+v", code) + } + + // Pre-approval poll is pending. + if _, tok := do(t, h, "POST", "/auth/device/token", "", map[string]string{"device_code": deviceCode}); tok["status"] != "authorization_pending" { + t.Fatalf("pre-approval poll = %+v", tok) + } + + // Approve requires the owner token. + if w, _ := do(t, h, "POST", "/auth/device/approve", "wrong", map[string]string{"user_code": userCode}); w.Code != http.StatusUnauthorized { + t.Fatalf("approve w/o owner token = %d, want 401", w.Code) + } + if w, _ := do(t, h, "POST", "/auth/device/approve", "owner-secret", map[string]string{"user_code": userCode}); w.Code != http.StatusOK { + t.Fatalf("owner approve = %d, want 200", w.Code) + } + + // Worker polls and gets a member token. + _, tok := do(t, h, "POST", "/auth/device/token", "", map[string]string{"device_code": deviceCode}) + if tok["status"] != "authorized" { + t.Fatalf("post-approval poll = %+v", tok) + } + member, _ := tok["token"].(string) + if !strings.HasPrefix(member, "obol-research-mt-") { + t.Fatalf("member token = %q", member) + } + + // Member can read the task. + if w, task := do(t, h, "GET", "/task", member, nil); w.Code != http.StatusOK || task["program"] == nil { + t.Fatalf("member /task = %d %+v", w.Code, task) + } + + // A bogus token is forbidden. + if w, _ := do(t, h, "GET", "/task", "obol-research-mt-bogus", nil); w.Code != http.StatusForbidden { + t.Fatalf("bogus token /task = %d, want 403", w.Code) + } + + // Member submits a result that beats the baseline → accepted champion. + w, res := do(t, h, "POST", "/results", member, map[string]any{"worker": "spark1", "value": 1.10}) + if w.Code != http.StatusOK || res["accepted"] != true || res["champion"] != true { + t.Fatalf("submit = %d %+v", w.Code, res) + } + + // Status reflects roster + champion + payout. + _, st := do(t, h, "GET", "/status", member, nil) + if st["champion"] == nil { + t.Fatalf("status champion missing: %+v", st) + } +} + +func TestServer_OpenMembershipAutoApproves(t *testing.T) { + s := New(testProgram(), MembershipOpen, "owner", nil) + h := s.Handler() + + _, code := do(t, h, "POST", "/auth/device/code", "", map[string]string{"worker": "w"}) + deviceCode := code["device_code"].(string) + + // No owner approve step — open membership auto-approved; first poll mints. + _, tok := do(t, h, "POST", "/auth/device/token", "", map[string]string{"device_code": deviceCode}) + if tok["status"] != "authorized" || tok["token"] == "" { + t.Fatalf("open-membership poll = %+v", tok) + } +}