diff --git a/README.md b/README.md index af66387..66d09fa 100644 --- a/README.md +++ b/README.md @@ -238,6 +238,30 @@ Offline tools (`lint`, `check_migration`, `drift`) work immediately after the pu No server, no credentials. Same promise as before. +### Push snapshots to an OCI registry + +Any OCI registry can hold snapshots: GitHub Container Registry, Google Artifact Registry, Amazon ECR, Docker Hub, Harbor, or a self-hosted one. The registry handles authentication, retention, and access control, so there is no server to run. + +Authenticate the same way you would for `docker push`, register the remote, and push: + +```sh +docker login ghcr.io +dryrun remote add ghcr --ref ghcr.io/myorg/dryrun --default +dryrun snapshot take --push +``` + +`snapshot take --push` captures and publishes in one step. Consumers pull: + +```sh +dryrun snapshot pull --remote ghcr +``` + +`pull` fetches only the latest take by default, so cold pulls (fresh CI, empty `history.db`) stay cheap regardless of how much history the registry holds. Use `--full` to backfill the entire history, or `--since 7d` (also `2w`, `24h`, or a UTC date like `2026-01-01`) for a window. `push` always sends your full local history; since it is incremental by content hash, an owner that pushes on a cadence only uploads the new observations each run. + +`--ref` is the registry base. Each database gets its own repository under it, `//`, so `myapp`'s `auth` database lands at `ghcr.io/myorg/dryrun/myapp/auth`. Snapshots map to OCI artifacts addressed by content hash, so pushing the same one twice changes nothing and shared blobs deduplicate on the registry. For Google Artifact Registry, run `gcloud auth configure-docker us-docker.pkg.dev` in place of `docker login`; the rest is identical. + +See [`docs/dryrun-toml.md`](docs/dryrun-toml.md) for per-profile remotes and sharing one stream across projects. + ## MCP server Add `dryrun` to your AI assistant. If you installed via Homebrew, `dryrun` is already on your PATH: @@ -285,17 +309,14 @@ See the [Tutorial](TUTORIAL.md) for live database setup, SSE transport, and Clau - **[Tutorial](TUTORIAL.md)** for offline, online, and multi-node workflows with full tool reference - **[Multi-node statistics](docs/multi-node-stats.md)** for cluster-wide stats collection, aggregation rules, and replica imbalance detection -- **[Configuration reference](docs/dryrun-toml.md)** for `dryrun.toml` profiles, conventions, and lint rules +- **[Configuration reference](docs/dryrun-toml.md)** for `dryrun.toml` profiles, conventions, remotes, and lint rules +- **[CLI stability](docs/cli-stability.md)** for which commands are stable versus experimental - **[Security overview](SECURITY.md)** for the CLI/MCP split and masking - **[boringSQL](https://boringsql.com)**, the blog and project home +- **[dryrun project page](https://boringsql.com/products/dryrun/)**, overview and docs +- **[Don't let AI touch your production database](https://boringsql.com/posts/dont-let-ai-to-prod/)**, why most Postgres MCPs are unsafe and what `dryrun` does differently - **[RegreSQL](https://github.com/boringsql/regresql)**, SQL regression testing and **`dryrun`**'s companion tool - - -## Upgrading from 0.5.x - -- `dump-schema --stats-only` is removed. Use `dryrun snapshot take` (primary) and `dryrun snapshot activity` (replicas). -- Snapshot JSON no longer embeds `Table.stats`, `Column.stats`, `Index.stats`, or `node_stats`. Stats are read per-kind from the history db via `HistoryStore::get_annotated`. -- `check_drift` is now schema-only. It no longer flaps when `reltuples` or `idx_scan` change. +- **[Fixturize](https://github.com/boringSQL/fixturize)**, subset and mask production data for dev/test ## License diff --git a/cmd/dryrun/main.go b/cmd/dryrun/main.go index c92d408..7bd6cb0 100644 --- a/cmd/dryrun/main.go +++ b/cmd/dryrun/main.go @@ -45,7 +45,7 @@ func main() { root.AddCommand( probeCmd(), initCmd(), importCmd(), dumpSchemaCmd(), lintCmd(), driftCmd(), snapshotCmd(), profileCmd(), - mcpServeCmd(), statsCmd(), versionCmd(), + remoteCmd(), mcpServeCmd(), statsCmd(), versionCmd(), ) if err := root.Execute(); err != nil { @@ -331,6 +331,10 @@ func snapshotCmd() *cobra.Command { c.Flags().StringVar(&historyDB, "history-db", "", "history database path") } + var ( + pushAfter bool + pushRemote string + ) takeCmd := &cobra.Command{ Use: "take", Short: "Take a new snapshot (schema + planner + activity; primary only)", @@ -375,6 +379,14 @@ func snapshotCmd() *cobra.Command { } fmt.Printf("Activity stats saved: %s (label=primary, %d tables, %d indexes)\n", activity.ContentHash, len(activity.Tables), len(activity.Indexes)) + + if pushAfter { + dst, err := resolveSyncStore("", "", pushRemote) + if err != nil { + return err + } + return runSync(cmd.Context(), store, dst, false, fullScope(), os.Stdout) + } return nil }, } @@ -382,6 +394,8 @@ func snapshotCmd() *cobra.Command { takeCmd.Flags().StringVar(&flagMasksFile, "masks-file", "", "path to data-masking-policy.yml") takeCmd.Flags().StringSliceVar(&flagMaskPolicy, "mask-policy", nil, "masking policy name (repeatable, comma-separated)") takeCmd.Flags().BoolVar(&flagNoMasks, "no-masks", false, "disable planner-stats masking (raw stats land in history.db)") + takeCmd.Flags().BoolVar(&pushAfter, "push", false, "push the snapshot to a remote after capture") + takeCmd.Flags().StringVar(&pushRemote, "remote", "", "configured [[remote]] name (with --push)") listCmd := &cobra.Command{ Use: "list", diff --git a/cmd/dryrun/remote.go b/cmd/dryrun/remote.go new file mode 100644 index 0000000..08b86e0 --- /dev/null +++ b/cmd/dryrun/remote.go @@ -0,0 +1,171 @@ +package main + +import ( + "fmt" + "os" + "strings" + + "github.com/spf13/cobra" + + "github.com/boringsql/dryrun/internal/config" +) + +func remoteCmd() *cobra.Command { + cmd := &cobra.Command{Use: "remote", Short: "Manage [[remote]] entries in dryrun.toml"} + cmd.AddCommand(remoteAddCmd(), remoteListCmd(), remoteRmCmd()) + return cmd +} + +func remoteListCmd() *cobra.Command { + return &cobra.Command{ + Use: "list", + Short: "List configured remotes", + RunE: func(cmd *cobra.Command, args []string) error { + path, cfg, err := loadProjectConfig() + if err != nil { + return err + } + fmt.Printf("Config: %s\n", path) + if len(cfg.Remotes) == 0 { + fmt.Println("No remotes configured.") + return nil + } + for _, r := range cfg.Remotes { + def := "" + if r.Default { + def = " (default)" + } + fmt.Printf(" %s %s %s%s\n", r.Name, r.Type, r.Ref, def) + } + return nil + }, + } +} + +func remoteAddCmd() *cobra.Command { + var ( + typ, ref, tokenEnv string + isDefault bool + ) + cmd := &cobra.Command{ + Use: "add ", + Short: "Add a remote to dryrun.toml", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + name := args[0] + if ref == "" { + return fmt.Errorf("--ref is required") + } + path, cfg, err := loadProjectConfig() + if err != nil { + return err + } + for _, r := range cfg.Remotes { + if r.Name == name { + return fmt.Errorf("remote %q already exists", name) + } + } + block := remoteBlock(config.RemoteConfig{ + Name: name, Type: typ, Ref: ref, TokenEnv: tokenEnv, Default: isDefault, + }) + data, err := os.ReadFile(path) + if err != nil { + return err + } + out := strings.TrimRight(string(data), "\n") + "\n" + block + if err := os.WriteFile(path, []byte(out), 0o644); err != nil { + return err + } + fmt.Printf("Added remote %q -> %s\n", name, ref) + return nil + }, + } + cmd.Flags().StringVar(&typ, "type", "oci", "remote type") + cmd.Flags().StringVar(&ref, "ref", "", "registry base ref (e.g. ghcr.io/org/dryrun)") + cmd.Flags().StringVar(&tokenEnv, "token-env", "", "env var holding a bearer token") + cmd.Flags().BoolVar(&isDefault, "default", false, "mark as the default remote") + return cmd +} + +func remoteRmCmd() *cobra.Command { + return &cobra.Command{ + Use: "rm ", + Short: "Remove a remote from dryrun.toml", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + name := args[0] + path, _, err := loadProjectConfig() + if err != nil { + return err + } + data, err := os.ReadFile(path) + if err != nil { + return err + } + out, removed := removeRemoteBlock(string(data), name) + if !removed { + return fmt.Errorf("remote %q not found", name) + } + if err := os.WriteFile(path, []byte(out), 0o644); err != nil { + return err + } + fmt.Printf("Removed remote %q\n", name) + return nil + }, + } +} + +func remoteBlock(r config.RemoteConfig) string { + var b strings.Builder + b.WriteString("\n[[remote]]\n") + fmt.Fprintf(&b, "name = %q\n", r.Name) + fmt.Fprintf(&b, "type = %q\n", r.Type) + fmt.Fprintf(&b, "ref = %q\n", r.Ref) + if r.TokenEnv != "" { + fmt.Fprintf(&b, "token_env = %q\n", r.TokenEnv) + } + if r.Default { + b.WriteString("default = true\n") + } + return b.String() +} + +// drops the [[remote]] block whose name matches, plus any blank lines before it. +// a block runs from its [[remote]] header to the next table header or EOF. +func removeRemoteBlock(content, name string) (string, bool) { + lines := strings.Split(content, "\n") + var out []string + removed := false + for i := 0; i < len(lines); { + if strings.TrimSpace(lines[i]) == "[[remote]]" { + j := i + 1 + for j < len(lines) && !strings.HasPrefix(strings.TrimSpace(lines[j]), "[") { + j++ + } + if blockHasName(lines[i:j], name) { + for len(out) > 0 && strings.TrimSpace(out[len(out)-1]) == "" { + out = out[:len(out)-1] + } + removed = true + i = j + continue + } + out = append(out, lines[i:j]...) + i = j + continue + } + out = append(out, lines[i]) + i++ + } + return strings.Join(out, "\n"), removed +} + +func blockHasName(block []string, name string) bool { + for _, l := range block { + k, v, ok := strings.Cut(l, "=") + if ok && strings.TrimSpace(k) == "name" && strings.Trim(strings.TrimSpace(v), `"'`) == name { + return true + } + } + return false +} diff --git a/cmd/dryrun/snapshot_sync.go b/cmd/dryrun/snapshot_sync.go index 40ae7d3..69235c7 100644 --- a/cmd/dryrun/snapshot_sync.go +++ b/cmd/dryrun/snapshot_sync.go @@ -5,12 +5,23 @@ import ( "fmt" "io" "os" + "strconv" + "strings" + "time" "github.com/spf13/cobra" "github.com/boringsql/dryrun/internal/history" ) +// latest keeps only the newest take (per node for activity); rng is the --since window. +type pullScope struct { + latest bool + rng history.TimeRange +} + +func fullScope() pullScope { return pullScope{} } + // KindCounts splits a per-kind sync result into work done vs work skipped. type KindCounts struct { Copied int @@ -26,30 +37,29 @@ type SyncOutcome struct { func snapshotPushCmd() *cobra.Command { var ( - toPath string - all bool - historyDB string + toPath, ociRef, remoteName string + all bool + historyDB string ) cmd := &cobra.Command{ Use: "push", - Short: "Push snapshots from history.db to a filesystem store", + Short: "Push snapshots from history.db to a filesystem store or OCI registry", RunE: func(cmd *cobra.Command, args []string) error { - if toPath == "" { - return fmt.Errorf("--to-path is required") - } src, err := openHistoryStore(historyDB) if err != nil { return err } defer src.Close() - dst, err := history.NewFilesystemStore(toPath) + dst, err := resolveSyncStore(toPath, ociRef, remoteName) if err != nil { return err } - return runSync(cmd.Context(), src, dst, all, os.Stdout) + return runSync(cmd.Context(), src, dst, all, fullScope(), os.Stdout) }, } - cmd.Flags().StringVar(&toPath, "to-path", "", "destination directory (required)") + cmd.Flags().StringVar(&toPath, "to-path", "", "destination directory") + cmd.Flags().StringVar(&ociRef, "oci", "", "OCI registry base ref (e.g. ghcr.io/org/dryrun)") + cmd.Flags().StringVar(&remoteName, "remote", "", "configured [[remote]] name") cmd.Flags().BoolVar(&all, "all", false, "sync all keys from the source") cmd.Flags().StringVar(&historyDB, "history-db", "", "history database path") return cmd @@ -57,18 +67,24 @@ func snapshotPushCmd() *cobra.Command { func snapshotPullCmd() *cobra.Command { var ( - fromPath string - all bool - historyDB string + fromPath, ociRef, remoteName string + all, full bool + since string + historyDB string ) cmd := &cobra.Command{ Use: "pull", - Short: "Pull snapshots from a filesystem store into history.db", + Short: "Pull the latest snapshot from a filesystem store or OCI registry into history.db", RunE: func(cmd *cobra.Command, args []string) error { - if fromPath == "" { - return fmt.Errorf("--from-path is required") + scope := pullScope{latest: !full} + if since != "" { + from, err := parseSince(since) + if err != nil { + return err + } + scope.rng = history.TimeRange{From: &from} } - src, err := history.NewFilesystemStore(fromPath) + src, err := resolveSyncStore(fromPath, ociRef, remoteName) if err != nil { return err } @@ -77,17 +93,160 @@ func snapshotPullCmd() *cobra.Command { return err } defer dst.Close() - return runSync(cmd.Context(), src, dst, all, os.Stdout) + return runSync(cmd.Context(), src, dst, all, scope, os.Stdout) }, } - cmd.Flags().StringVar(&fromPath, "from-path", "", "source directory (required)") + cmd.Flags().StringVar(&fromPath, "from-path", "", "source directory") + cmd.Flags().StringVar(&ociRef, "oci", "", "OCI registry base ref (e.g. ghcr.io/org/dryrun)") + cmd.Flags().StringVar(&remoteName, "remote", "", "configured [[remote]] name") cmd.Flags().BoolVar(&all, "all", false, "sync all keys from the source") + cmd.Flags().BoolVar(&full, "full", false, "pull the entire history, not just the latest take") + cmd.Flags().StringVar(&since, "since", "", "only pull snapshots newer than a duration (7d, 2w, 24h) or UTC date (2006-01-02)") cmd.Flags().StringVar(&historyDB, "history-db", "", "history database path") return cmd } +// parseSince accepts a relative duration (7d, 2w, 24h) or an absolute date. +func parseSince(s string) (time.Time, error) { + s = strings.TrimSpace(s) + if s == "" { + return time.Time{}, fmt.Errorf("empty --since") + } + if d, err := parseRelative(s); err == nil { + return time.Now().Add(-d), nil + } + for _, layout := range []string{time.RFC3339, "2006-01-02 15:04:05", "2006-01-02"} { + if t, err := time.Parse(layout, s); err == nil { + return t, nil + } + } + return time.Time{}, fmt.Errorf("invalid --since %q: want duration (7d, 2w, 24h) or date (2006-01-02)", s) +} + +// parseRelative extends time.ParseDuration with day/week suffixes it lacks. +func parseRelative(s string) (time.Duration, error) { + var d time.Duration + if n := len(s); n >= 2 { + switch unit := s[n-1]; unit { + case 'd', 'w': + val, err := strconv.ParseFloat(s[:n-1], 64) + if err != nil { + return 0, err + } + hours := val * 24 + if unit == 'w' { + hours *= 7 + } + d = time.Duration(hours * float64(time.Hour)) + } + } + if d == 0 { + var err error + if d, err = time.ParseDuration(s); err != nil { + return 0, err + } + } + if d < 0 { + return 0, fmt.Errorf("negative duration: %s", s) + } + return d, nil +} + +// explicit flags win; with none, fall back to the profile's configured remote. +func resolveSyncStore(path, ociRef, remoteName string) (history.SnapshotStore, error) { + switch { + case ociRef != "" || remoteName != "": + return buildOCIStore(ociRef, remoteName) + case path != "": + return history.NewFilesystemStore(path) + default: + if r := profileRemote(); r != "" { + return buildOCIStore("", r) + } + return nil, fmt.Errorf("specify --to-path/--from-path, --oci, or --remote") + } +} + +// --oci is a direct ref; --remote resolves base+token_env from [[remote]]. +func buildOCIStore(ociRef, remoteName string) (history.SnapshotStore, error) { + base := ociRef + var tokenEnv string + if base == "" { + _, cfg, err := loadProjectConfig() + if err != nil { + return nil, err + } + r, err := cfg.ResolveRemote(remoteName) + if err != nil { + return nil, err + } + if r.Type != "" && r.Type != "oci" { + return nil, fmt.Errorf("remote %q has unsupported type %q", r.Name, r.Type) + } + base, tokenEnv = r.Ref, r.TokenEnv + } + client, err := history.NewAuthClient(history.AuthConfig{TokenEnv: tokenEnv}) + if err != nil { + return nil, err + } + return history.NewOCIStore(history.OCIConfig{ + Base: base, + Client: client, + PlainHTTP: isLocalRegistry(base), + StreamFor: streamMapper(), + }) +} + +// profileRemote is the [[remote]] name a profile pins, "" if none/unresolvable. +func profileRemote() string { + cwd, _ := os.Getwd() + _, cfg, err := loadProjectConfig() + if err != nil { + return "" + } + rp, err := cfg.ResolveProfile(nil, nil, nilIfEmpty(flagProfile), cwd) + if err != nil || rp.Remote == nil { + return "" + } + return *rp.Remote +} + +// streamMapper maps each key to its profile's stream override, else StreamSuffix. +func streamMapper() func(history.SnapshotKey) string { + cwd, _ := os.Getwd() + _, cfg, err := loadProjectConfig() + if err != nil { + return history.StreamSuffix + } + overrides := map[history.SnapshotKey]string{} + for name := range cfg.Profiles { + n := name + rp, err := cfg.ResolveProfile(nil, nil, &n, cwd) + if err != nil { + continue + } + key := rp.SnapshotKey() + if s := rp.Stream(); s != history.StreamSuffix(key) { + overrides[key] = s + } + } + return func(k history.SnapshotKey) string { + if s, ok := overrides[k]; ok { + return s + } + return history.StreamSuffix(k) + } +} + +// local registries (registry:2/zot) speak http, not https +func isLocalRegistry(ref string) bool { + return strings.HasPrefix(ref, "localhost") || + strings.HasPrefix(ref, "127.0.0.1") || + strings.HasPrefix(ref, "::1") +} + // --all takes src.ListKeys, otherwise scope is the resolved profile key. -func runSync(ctx context.Context, src, dst history.SnapshotStore, all bool, w io.Writer) error { +func runSync(ctx context.Context, src, dst history.SnapshotStore, all bool, scope pullScope, w io.Writer) error { var keys []history.SnapshotKey if all { ks, err := src.ListKeys(ctx) @@ -99,7 +258,7 @@ func runSync(ctx context.Context, src, dst history.SnapshotStore, all bool, w io keys = []history.SnapshotKey{resolveSnapshotKey()} } - outs, err := syncKeys(ctx, src, dst, keys) + outs, err := syncKeys(ctx, src, dst, keys, scope) if err != nil { return err } @@ -109,12 +268,16 @@ func runSync(ctx context.Context, src, dst history.SnapshotStore, all bool, w io // syncKeys diffs src vs dst by content_hash per kind and copies the gap // in schema -> planner -> activity order. -func syncKeys(ctx context.Context, src, dst history.SnapshotStore, keys []history.SnapshotKey) ([]SyncOutcome, error) { +func syncKeys(ctx context.Context, src, dst history.SnapshotStore, keys []history.SnapshotKey, scope pullScope) ([]SyncOutcome, error) { out := make([]SyncOutcome, 0, len(keys)) for _, key := range keys { o := SyncOutcome{Key: key} + sel, err := selectSnapshots(ctx, src, key, scope) + if err != nil { + return out, fmt.Errorf("select %s/%s: %w", key.ProjectID, key.DatabaseID, err) + } for _, kind := range kindOrder() { - c, err := syncKind(ctx, src, dst, key, kind) + c, err := syncKindList(ctx, src, dst, key, kind, sel[kind.Tag]) if err != nil { return out, fmt.Errorf("sync %s/%s %s: %w", key.ProjectID, key.DatabaseID, kind, err) @@ -133,6 +296,113 @@ func syncKeys(ctx context.Context, src, dst history.SnapshotStore, keys []histor return out, nil } +// schema dedups, so a recent take can reference a schema older than --since; +// resolve schema by ref over the full list, not by the time window. +func selectSnapshots(ctx context.Context, src history.SnapshotStore, key history.SnapshotKey, scope pullScope) (map[history.SnapshotKindTag][]history.SnapshotSummary, error) { + planner, err := src.List(ctx, key, history.PlannerKind(), scope.rng) + if err != nil { + return nil, err + } + activity, err := src.List(ctx, key, history.ActivityKind(""), scope.rng) + if err != nil { + return nil, err + } + allSchema, err := src.List(ctx, key, history.SchemaKind(), history.TimeRange{}) + if err != nil { + return nil, err + } + + if scope.latest { + planner = newestPerNode(planner) + activity = newestPerNode(activity) + } + + need := make(map[string]struct{}, len(planner)+len(activity)) + for _, s := range planner { + need[s.SchemaRefHash] = struct{}{} + } + for _, s := range activity { + need[s.SchemaRefHash] = struct{}{} + } + schema := filterByContentHash(allSchema, need) + + switch { + case scope.latest && len(schema) == 0: + // schema-only stream: nothing references a schema, pull the newest one. + schema = newestPerNode(allSchema) + case !scope.latest: + // full mode also keeps schemas captured within the window itself + // (e.g. a schema-only take), unioned with the referenced ones. + schema = unionByContentHash(schema, windowed(allSchema, scope.rng)) + } + + return map[history.SnapshotKindTag][]history.SnapshotSummary{ + history.KindSchema: schema, + history.KindPlanner: planner, + history.KindActivity: activity, + }, nil +} + +// newest summary per node label; equal-second ties break on hash for determinism. +func newestPerNode(list []history.SnapshotSummary) []history.SnapshotSummary { + best := map[string]history.SnapshotSummary{} + for _, s := range list { + cur, ok := best[s.Kind.NodeLabel] + if !ok || s.Timestamp.After(cur.Timestamp) || + (s.Timestamp.Equal(cur.Timestamp) && s.ContentHash > cur.ContentHash) { + best[s.Kind.NodeLabel] = s + } + } + out := make([]history.SnapshotSummary, 0, len(best)) + for _, s := range best { + out = append(out, s) + } + return out +} + +func filterByContentHash(list []history.SnapshotSummary, want map[string]struct{}) []history.SnapshotSummary { + var out []history.SnapshotSummary + for _, s := range list { + if _, ok := want[s.ContentHash]; ok { + out = append(out, s) + } + } + return out +} + +// windowed mirrors the store's range semantics: From inclusive, To exclusive. +func windowed(list []history.SnapshotSummary, rng history.TimeRange) []history.SnapshotSummary { + if rng.From == nil && rng.To == nil { + return list + } + var out []history.SnapshotSummary + for _, s := range list { + if rng.From != nil && s.Timestamp.Before(*rng.From) { + continue + } + if rng.To != nil && !s.Timestamp.Before(*rng.To) { + continue + } + out = append(out, s) + } + return out +} + +func unionByContentHash(a, b []history.SnapshotSummary) []history.SnapshotSummary { + seen := make(map[string]struct{}, len(a)+len(b)) + out := make([]history.SnapshotSummary, 0, len(a)+len(b)) + for _, list := range [][]history.SnapshotSummary{a, b} { + for _, s := range list { + if _, ok := seen[s.ContentHash]; ok { + continue + } + seen[s.ContentHash] = struct{}{} + out = append(out, s) + } + } + return out +} + // activity uses an empty NodeLabel so List returns every node's row in one pass. func kindOrder() []history.SnapshotKind { return []history.SnapshotKind{ @@ -142,13 +412,9 @@ func kindOrder() []history.SnapshotKind { } } -func syncKind(ctx context.Context, src, dst history.SnapshotStore, key history.SnapshotKey, kind history.SnapshotKind) (KindCounts, error) { +// dst is listed unwindowed so rows already present outside scope.rng count as up-to-date. +func syncKindList(ctx context.Context, src, dst history.SnapshotStore, key history.SnapshotKey, kind history.SnapshotKind, srcList []history.SnapshotSummary) (KindCounts, error) { var counts KindCounts - - srcList, err := src.List(ctx, key, kind, history.TimeRange{}) - if err != nil { - return counts, err - } if len(srcList) == 0 { return counts, nil } diff --git a/cmd/dryrun/snapshot_sync_test.go b/cmd/dryrun/snapshot_sync_test.go index e46f3f3..c57df40 100644 --- a/cmd/dryrun/snapshot_sync_test.go +++ b/cmd/dryrun/snapshot_sync_test.go @@ -99,7 +99,7 @@ func TestSyncKeysCopiesEverythingToEmptyDst(t *testing.T) { t.Fatal(err) } - outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}) + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, fullScope()) if err != nil { t.Fatalf("syncKeys: %v", err) } @@ -147,7 +147,7 @@ func TestSyncKeysReportsUpToDateForMatchingHashes(t *testing.T) { t.Fatal(err) } - outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}) + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, fullScope()) if err != nil { t.Fatalf("syncKeys: %v", err) } @@ -200,7 +200,7 @@ func TestSyncCopiesActivityPerNodeLabel(t *testing.T) { } } - outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}) + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, fullScope()) if err != nil { t.Fatalf("syncKeys: %v", err) } @@ -251,7 +251,7 @@ func TestSyncKindOrderIsSchemaPlannerActivity(t *testing.T) { t.Fatal(err) } - if _, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}); err != nil { + if _, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, fullScope()); err != nil { t.Fatalf("syncKeys against FilesystemStore dst: %v", err) } } @@ -273,7 +273,7 @@ func TestSyncAllUsesListKeys(t *testing.T) { } var buf bytes.Buffer - if err := runSync(ctx, src, dst, true, &buf); err != nil { + if err := runSync(ctx, src, dst, true, fullScope(), &buf); err != nil { t.Fatalf("runSync(all=true): %v", err) } out := buf.String() @@ -312,10 +312,10 @@ func TestRoundTripSQLiteToFsToSQLite(t *testing.T) { } } - if _, err := syncKeys(ctx, srcA, fsMid, []history.SnapshotKey{k}); err != nil { + if _, err := syncKeys(ctx, srcA, fsMid, []history.SnapshotKey{k}, fullScope()); err != nil { t.Fatalf("push A -> FS: %v", err) } - if _, err := syncKeys(ctx, fsMid, dstB, []history.SnapshotKey{k}); err != nil { + if _, err := syncKeys(ctx, fsMid, dstB, []history.SnapshotKey{k}, fullScope()); err != nil { t.Fatalf("pull FS -> B: %v", err) } @@ -359,3 +359,220 @@ func TestPrintSyncOutcomesEmpty(t *testing.T) { t.Errorf("got %q, want a 'No keys to sync' notice", buf.String()) } } + +// seedTwoTakes lays down two distinct takes (older t0, newer t1) under key k, +// each with its own schema + planner and one activity row per node (primary + +// replica). Returns (t0, t1). Used by the latest/full/since selection tests. +func seedTwoTakes(t *testing.T, ctx context.Context, src *history.Store, k history.SnapshotKey) (time.Time, time.Time) { + t.Helper() + t1 := time.Now().UTC().Truncate(time.Second) + t0 := t1.Add(-24 * time.Hour) + + for _, take := range []struct { + ts time.Time + sh, pl, suf string + }{ + {t0, "sh-0", "pl-0", "0"}, + {t1, "sh-1", "pl-1", "1"}, + } { + if _, err := src.PutSchema(ctx, k, syncTestSchema(take.sh, "appdb", take.ts)); err != nil { + t.Fatal(err) + } + if _, err := src.PutPlanner(ctx, k, syncTestPlanner(take.sh, take.pl, "appdb", take.ts)); err != nil { + t.Fatal(err) + } + if _, err := src.PutActivity(ctx, k, syncTestActivity(take.sh, "ac-p"+take.suf, "primary", take.ts, false)); err != nil { + t.Fatal(err) + } + if _, err := src.PutActivity(ctx, k, syncTestActivity(take.sh, "ac-r"+take.suf, "replica", take.ts, true)); err != nil { + t.Fatal(err) + } + } + return t0, t1 +} + +// Latest scope (the pull default) must copy exactly the newest take: one +// schema, one planner, and one activity row PER NODE — never the older take. +// The per-node count (2, not 1) guards that newestPerNode groups by node label +// rather than collapsing all activity to a single newest row. +func TestSyncLatestSelectsNewestTakePerNode(t *testing.T) { + ctx := context.Background() + src := openSQLite(t) + dst := openSQLite(t) + k := syncKey("acme", "primary") + seedTwoTakes(t, ctx, src, k) + + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, pullScope{latest: true}) + if err != nil { + t.Fatalf("syncKeys latest: %v", err) + } + o := outs[0] + if o.Schema.Copied != 1 { + t.Errorf("schema copied = %d, want 1 (newest take only)", o.Schema.Copied) + } + if o.Planner.Copied != 1 { + t.Errorf("planner copied = %d, want 1 (newest take only)", o.Planner.Copied) + } + if o.Activity.Copied != 2 { + t.Errorf("activity copied = %d, want 2 (newest take, one row per node)", o.Activity.Copied) + } +} + +// Full scope backfills both takes: 2 schema, 2 planner, 4 activity (2 nodes x 2 takes). +func TestSyncFullScopeCopiesAllTakes(t *testing.T) { + ctx := context.Background() + src := openSQLite(t) + dst := openSQLite(t) + k := syncKey("acme", "primary") + seedTwoTakes(t, ctx, src, k) + + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, fullScope()) + if err != nil { + t.Fatalf("syncKeys full: %v", err) + } + o := outs[0] + if o.Schema.Copied != 2 || o.Planner.Copied != 2 || o.Activity.Copied != 4 { + t.Errorf("full copied = schema %d / planner %d / activity %d, want 2/2/4", + o.Schema.Copied, o.Planner.Copied, o.Activity.Copied) + } +} + +// --since with a window between the two takes drops the older take even in +// full mode (the range bounds the source list before selection). +func TestSyncSinceWindowExcludesOlderTake(t *testing.T) { + ctx := context.Background() + src := openSQLite(t) + dst := openSQLite(t) + k := syncKey("acme", "primary") + _, t1 := seedTwoTakes(t, ctx, src, k) + + cutoff := t1.Add(-time.Hour) // after t0, before t1 + scope := pullScope{rng: history.TimeRange{From: &cutoff}} + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, scope) + if err != nil { + t.Fatalf("syncKeys since: %v", err) + } + o := outs[0] + if o.Schema.Copied != 1 || o.Planner.Copied != 1 || o.Activity.Copied != 2 { + t.Errorf("since copied = schema %d / planner %d / activity %d, want 1/1/2 (newest take only)", + o.Schema.Copied, o.Planner.Copied, o.Activity.Copied) + } +} + +// Regression: a stable schema keeps its original (old) timestamp, so a take +// captured today can reference a schema from 30 days ago. Pulling --since 7d +// must still bring that old schema along, or the planner/activity land +// orphaned (their schema_ref_hash resolves to nothing locally). Both latest +// and full mode must resolve the referenced schema unwindowed. +func TestSyncSincePullsReferencedSchemaOlderThanWindow(t *testing.T) { + for _, full := range []bool{false, true} { + ctx := context.Background() + src := openSQLite(t) + dst := openSQLite(t) + k := syncKey("acme", "primary") + + now := time.Now().UTC().Truncate(time.Second) + old := now.Add(-30 * 24 * time.Hour) + if _, err := src.PutSchema(ctx, k, syncTestSchema("sh-old", "appdb", old)); err != nil { + t.Fatal(err) + } + // today's take binds to the unchanged 30-day-old schema. + if _, err := src.PutPlanner(ctx, k, syncTestPlanner("sh-old", "pl-now", "appdb", now)); err != nil { + t.Fatal(err) + } + if _, err := src.PutActivity(ctx, k, syncTestActivity("sh-old", "ac-p", "primary", now, false)); err != nil { + t.Fatal(err) + } + if _, err := src.PutActivity(ctx, k, syncTestActivity("sh-old", "ac-r", "replica", now, true)); err != nil { + t.Fatal(err) + } + + cutoff := now.Add(-7 * 24 * time.Hour) // excludes the 30-day-old schema row + scope := pullScope{latest: !full, rng: history.TimeRange{From: &cutoff}} + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, scope) + if err != nil { + t.Fatalf("full=%v syncKeys: %v", full, err) + } + o := outs[0] + if o.Schema.Copied != 1 { + t.Errorf("full=%v: schema copied = %d, want 1 (referenced schema pulled despite being out of window)", full, o.Schema.Copied) + } + if o.Planner.Copied != 1 || o.Activity.Copied != 2 { + t.Errorf("full=%v: planner %d / activity %d, want 1/2", full, o.Planner.Copied, o.Activity.Copied) + } + } +} + +// A schema-only stream (no planner/activity) still pulls its current schema in +// latest mode even when --since predates it: latest always includes the most +// recent known schema so local state is never left without one. +func TestSyncLatestSchemaOnlyStreamPullsCurrentSchema(t *testing.T) { + ctx := context.Background() + src := openSQLite(t) + dst := openSQLite(t) + k := syncKey("acme", "primary") + + old := time.Now().UTC().Truncate(time.Second).Add(-30 * 24 * time.Hour) + if _, err := src.PutSchema(ctx, k, syncTestSchema("sh-old", "appdb", old)); err != nil { + t.Fatal(err) + } + + cutoff := old.Add(7 * 24 * time.Hour) // window starts after the schema + scope := pullScope{latest: true, rng: history.TimeRange{From: &cutoff}} + outs, err := syncKeys(ctx, src, dst, []history.SnapshotKey{k}, scope) + if err != nil { + t.Fatalf("syncKeys: %v", err) + } + if o := outs[0]; o.Schema.Copied != 1 || o.Planner.Copied != 0 || o.Activity.Copied != 0 { + t.Errorf("schema-only latest = schema %d / planner %d / activity %d, want 1/0/0", + o.Schema.Copied, o.Planner.Copied, o.Activity.Copied) + } +} + +func TestParseSince(t *testing.T) { + ref := time.Now() + cases := []struct { + in string + wantErr bool + // approxAgo is the expected age of the result for relative inputs. + approxAgo time.Duration + // absolute is the exact expected instant for date inputs. + absolute *time.Time + }{ + {in: "7d", approxAgo: 7 * 24 * time.Hour}, + {in: "2w", approxAgo: 14 * 24 * time.Hour}, + {in: "24h", approxAgo: 24 * time.Hour}, + {in: "90m", approxAgo: 90 * time.Minute}, + {in: "1h30m", approxAgo: 90 * time.Minute}, + {in: "1.5d", approxAgo: 36 * time.Hour}, + {in: "-7d", wantErr: true}, + {in: "garbage", wantErr: true}, + {in: "", wantErr: true}, + } + for _, c := range cases { + got, err := parseSince(c.in) + if c.wantErr { + if err == nil { + t.Errorf("parseSince(%q): want error, got %v", c.in, got) + } + continue + } + if err != nil { + t.Errorf("parseSince(%q): unexpected error %v", c.in, err) + continue + } + ago := ref.Sub(got) + if d := ago - c.approxAgo; d < -2*time.Second || d > 2*time.Second { + t.Errorf("parseSince(%q): age %v, want ~%v", c.in, ago, c.approxAgo) + } + } + + // absolute date parses to that calendar day at UTC midnight. + got, err := parseSince("2026-01-02") + if err != nil { + t.Fatalf("parseSince(date): %v", err) + } + if got.Year() != 2026 || got.Month() != 1 || got.Day() != 2 { + t.Errorf("parseSince(date) = %v, want 2026-01-02", got) + } +} diff --git a/docs/cli-stability.md b/docs/cli-stability.md new file mode 100644 index 0000000..a497b7e --- /dev/null +++ b/docs/cli-stability.md @@ -0,0 +1,20 @@ +# CLI stability + +dryrun groups its command surface into stability tiers. Stable commands keep their flags and output shape across minor releases. Experimental commands may change while the design settles, so pin a version if you script against them. + +## Experimental: OCI remotes + +OCI registry storage shipped recently. The flags and the on-registry layout may change before they are marked stable. + +| Surface | Purpose | +|---------|---------| +| `dryrun remote add\|list\|rm` | Manage `[[remote]]` entries in dryrun.toml. | +| `dryrun snapshot push --oci\|--remote` | Push snapshots to an OCI registry. | +| `dryrun snapshot pull --oci\|--remote` | Pull snapshots from an OCI registry. | +| `dryrun snapshot take --push [--remote]` | Capture and push in one step. | + +`--oci ` targets a registry base path directly. `--remote ` resolves the base and credentials from a configured `[[remote]]`. The `--all` flag on push and pull syncs every key in the source rather than the resolved profile's. + +The artifact format carries its version in the media type (`application/vnd.dryrun.bundle.v1+zstd`). A future format would use a new version rather than reinterpreting v1, so older snapshots stay readable. + +See [dryrun-toml.md](dryrun-toml.md) for the `[[remote]]` block and the per-profile `remote` and `stream` keys. diff --git a/docs/dryrun-toml.md b/docs/dryrun-toml.md index 7e3aed0..ec419cd 100644 --- a/docs/dryrun-toml.md +++ b/docs/dryrun-toml.md @@ -78,6 +78,117 @@ Relative paths in `schema_file` are resolved from the project root (the director db_url = "postgres://${DB_USER}:${DB_PASS}@${DB_HOST}:5432/myapp" ``` +## Remotes + +A remote is an OCI registry that holds snapshots for sharing. The point is to capture once and distribute: someone with database access takes a snapshot and pushes it; everyone else, and CI, pulls it and works offline. No one else needs credentials to the database. + +The lifecycle is three commands: + +```sh +dryrun snapshot take --push # capture from the database and publish +dryrun snapshot pull # fetch the latest snapshots into local history +dryrun snapshot push # publish snapshots already in local history +``` + +`push` and `pull` read the remote from the active profile, or take `--remote ` to pick one, or `--oci ` to target a registry directly without configuring anything. The rest of this section is how you configure a remote so you don't repeat `--oci` every time. + +Any registry works: GitHub Container Registry, Google Artifact Registry, Amazon ECR, Docker Hub, Harbor, or a self-hosted `registry:2`/zot. A remote is one `[[remote]]` block: + +```toml +[[remote]] +name = "ghcr" +type = "oci" +ref = "ghcr.io/myorg/dryrun" +default = true +``` + +| Key | Meaning | +|-----|---------| +| `name` | How you refer to it: `--remote ghcr`. | +| `type` | `oci`. The only type today; defaults to `oci` when omitted. | +| `ref` | Registry base path. The repository is `//`. | +| `token_env` | Name of an environment variable holding a bearer token. Optional. | +| `default` | Use this remote when `--remote` is omitted. | + +`ref` names the registry and a base repository, not the final location. A snapshot belongs to a database, so dryrun appends `/`. With the remote above, `myapp`'s `auth` database lands at `ghcr.io/myorg/dryrun/myapp/auth`. One `ref` covers every project and database, each in its own repository. That trailing path is the `stream`, which a profile can [override](#per-profile-remote-and-stream). + +Declare more than one and mark one as the default: + +```toml +[[remote]] +name = "ghcr" +ref = "ghcr.io/myorg/dryrun" +default = true + +[[remote]] +name = "gar" +ref = "us-docker.pkg.dev/myproj/dryrun" +``` + +With a single remote, `--remote` is optional and resolves to it. With several, push and pull use the one marked `default`; pass `--remote` to pick another. + +Add and remove remotes from the command line instead of editing the file by hand: + +```sh +dryrun remote add ghcr --ref ghcr.io/myorg/dryrun --default +dryrun remote list +dryrun remote rm ghcr +``` + +### Authentication + +By default dryrun reads `~/.docker/config.json` and any configured credential helpers, so whatever authenticates `docker push` authenticates dryrun: + +```sh +docker login ghcr.io +gcloud auth configure-docker us-docker.pkg.dev +``` + +For a static bearer token, set `token_env` to the name of the variable that holds it: + +```toml +[[remote]] +name = "ci" +ref = "ghcr.io/myorg/dryrun" +token_env = "GHCR_TOKEN" +``` + +dryrun reads the token from `$GHCR_TOKEN` at push and pull time, so it never appears in the config file. A `token_env` that names an unset variable is an error, not a silent fall back to anonymous access. + +### Per-profile remote and stream + +A profile can pin a remote and override where its snapshots are stored: + +```toml +[profiles.prod-auth] +db_url = "${PROD_AUTH_DATABASE_URL}" +database_id = "auth" +remote = "ghcr" +stream = "shared/auth" +``` + +`remote` is the remote used when you run `snapshot push` or `pull` under this profile without `--remote`. + +`stream` is the repository path suffix under the remote's `ref`. It defaults to `/`, the same layout the filesystem store uses, so anything already pushed keeps resolving. It changes the storage location only: the local snapshot key stays `(project_id, database_id)`, so `lint`, `drift`, `take`, and `list` against local history are unaffected. + +### Sharing a database across projects + +A snapshot describes a database, not a project. When two projects point at the same physical database, give both the same `stream` so they read and write one shared history instead of two near-duplicates: + +```toml +# project A # project B +[profiles.auth] [profiles.auth] +db_url = "postgres://.../auth" db_url = "postgres://.../auth" +database_id = "auth" database_id = "auth" +remote = "gar" remote = "gar" +stream = "shared/auth" stream = "shared/auth" +``` + +Both profiles resolve to one repository, so the schema is stored once. Two rules come with a shared stream: + +- One owner takes and pushes; everyone else pulls. If both projects take snapshots on a schedule, the schema blob deduplicates but the differing timestamps and activity rows do not, leaving two near-duplicate observations per interval. Designate one CI job as the owner. +- Retention and access live on the shared repository. Cleanup policies and IAM are per repository: grant the owner write and consumers read on `shared/auth`. + ## Conventions These control what `dryrun lint` checks. Skip the whole section to use the defaults. @@ -224,10 +335,16 @@ schema_file = ".dryrun/schema.json" [profiles.dev] db_url = "${DEV_DATABASE_URL}" +remote = "ghcr" [profiles.staging] schema_file = ".dryrun/staging-schema.json" +[[remote]] +name = "ghcr" +ref = "ghcr.io/myorg/dryrun" +default = true + [conventions] table_name = "snake_singular" column_name = "snake_case" diff --git a/go.mod b/go.mod index b654846..73aa61a 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,12 @@ require ( github.com/jackc/pgx/v5 v5.9.1 github.com/klauspost/compress v1.18.6 github.com/mark3labs/mcp-go v0.45.0 + github.com/opencontainers/go-digest v1.0.0 + github.com/opencontainers/image-spec v1.1.1 github.com/pganalyze/pg_query_go/v6 v6.2.2 github.com/spf13/cobra v1.10.2 modernc.org/sqlite v1.47.0 + oras.land/oras-go/v2 v2.6.0 ) require ( diff --git a/go.sum b/go.sum index b6339be..fce29ae 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,10 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/pganalyze/pg_query_go/v6 v6.2.2 h1:O0L6zMC226R82RF3X5n0Ki6HjytDsoAzuzp4ATVAHNo= github.com/pganalyze/pg_query_go/v6 v6.2.2/go.mod h1:Cn6+j4870kJz3iYNsb0VsNG04vpSWgEvBwc590J4qD0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -123,3 +127,5 @@ modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +oras.land/oras-go/v2 v2.6.0 h1:X4ELRsiGkrbeox69+9tzTu492FMUu7zJQW6eJU+I2oc= +oras.land/oras-go/v2 v2.6.0/go.mod h1:magiQDfG6H1O9APp+rOsvCPcW1GD2MM7vgnKY0Y+u1o= diff --git a/internal/config/config.go b/internal/config/config.go index b40241d..d62d75f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -20,6 +20,16 @@ type ( Conventions *ConventionsConfig `toml:"conventions"` Services *ServicesConfig `toml:"services"` RequireMasks *bool `toml:"require_masks"` + Remotes []RemoteConfig `toml:"remote"` + } + + // [[remote]] block; Ref is the registry base, e.g. ghcr.io/org/dryrun + RemoteConfig struct { + Name string `toml:"name"` + Type string `toml:"type"` + Ref string `toml:"ref"` + TokenEnv string `toml:"token_env"` + Default bool `toml:"default"` } ProjectMeta struct { @@ -40,6 +50,8 @@ type ( DatabaseID *string `toml:"database_id"` MasksFile *string `toml:"masks_file"` MaskPolicies []string `toml:"mask_policies"` + Remote *string `toml:"remote"` + Stream *string `toml:"stream"` } ConventionsConfig struct { @@ -65,16 +77,58 @@ type ( } ResolvedProfile struct { - Name string - DBURL *string - SchemaFile *string - ProjectID history.ProjectId - DatabaseID *history.DatabaseId - MasksFile *string - MaskPolicies []string + Name string + DBURL *string + SchemaFile *string + ProjectID history.ProjectId + DatabaseID *history.DatabaseId + MasksFile *string + MaskPolicies []string + Remote *string + streamOverride *string } ) +// Stream is the remote repo-path suffix: explicit override or the default +// / (shared with BundleDir so remote/local layouts match). +func (r *ResolvedProfile) Stream() string { + if r.streamOverride != nil && *r.streamOverride != "" { + return *r.streamOverride + } + return history.StreamSuffix(r.SnapshotKey()) +} + +// ResolveRemote: by name, else the sole remote, else the sole default. +func (c *ProjectConfig) ResolveRemote(name string) (*RemoteConfig, error) { + if name != "" { + for i := range c.Remotes { + if c.Remotes[i].Name == name { + return &c.Remotes[i], nil + } + } + return nil, fmt.Errorf("remote %q not found in dryrun.toml", name) + } + switch len(c.Remotes) { + case 0: + return nil, fmt.Errorf("no remotes configured in dryrun.toml") + case 1: + return &c.Remotes[0], nil + } + var def *RemoteConfig + for i := range c.Remotes { + if c.Remotes[i].Default { + if def != nil { + return nil, fmt.Errorf("multiple default remotes; pass --remote") + } + def = &c.Remotes[i] + } + } + if def == nil { + return nil, fmt.Errorf("multiple remotes and no default; pass --remote") + } + return def, nil +} + func (r *ResolvedProfile) SnapshotKey() history.SnapshotKey { did := history.DatabaseId(string(r.ProjectID)) if r.DatabaseID != nil { @@ -251,6 +305,8 @@ func resolveProfileConfig(name string, profile *ProfileConfig, projectRoot strin rp.MasksFile = &p } rp.MaskPolicies = profile.MaskPolicies + rp.Remote = profile.Remote + rp.streamOverride = profile.Stream return rp } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index c3dc0d6..e5f6d17 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -500,6 +500,221 @@ db_url = "postgres://dev/x" } } +// TestParseRemotes: the [[remote]] array-of-tables must decode onto +// ProjectConfig.Remotes with every field surfaced. A remote that omits +// `type` leaves it empty (the OCI store treats "" as oci) and `default` +// defaults to false — both are exercised here so a half-specified remote +// can't silently acquire fields it didn't ask for. +func TestParseRemotes(t *testing.T) { + toml := ` +[[remote]] +name = "gar" +type = "oci" +ref = "us-docker.pkg.dev/proj/dryrun" +token_env = "GAR_TOKEN" +default = true + +[[remote]] +name = "ghcr" +ref = "ghcr.io/org/dryrun" +` + cfg, err := Parse(toml) + if err != nil { + t.Fatal(err) + } + if len(cfg.Remotes) != 2 { + t.Fatalf("expected 2 remotes, got %d", len(cfg.Remotes)) + } + gar := cfg.Remotes[0] + if gar.Name != "gar" || gar.Type != "oci" || gar.Ref != "us-docker.pkg.dev/proj/dryrun" { + t.Errorf("gar fields: %+v", gar) + } + if gar.TokenEnv != "GAR_TOKEN" || !gar.Default { + t.Errorf("gar token_env/default: %+v", gar) + } + ghcr := cfg.Remotes[1] + if ghcr.Type != "" || ghcr.TokenEnv != "" || ghcr.Default { + t.Errorf("ghcr should leave optional fields zero: %+v", ghcr) + } +} + +// TestProfileRemoteStreamBinding: the per-profile `remote` and `stream` keys +// must decode onto ProfileConfig and then flow through ResolveProfile onto +// ResolvedProfile. `stream` is a storage-location override only, so it must +// not perturb the SnapshotKey — that stays keyed by (project, database). +func TestProfileRemoteStreamBinding(t *testing.T) { + toml := ` +[project] +id = "demo" + +[profiles.auth] +db_url = "postgres://auth/x" +database_id = "auth" +remote = "gar" +stream = "shared/auth" +` + cfg, err := Parse(toml) + if err != nil { + t.Fatal(err) + } + + // raw decode lands verbatim on ProfileConfig + p := cfg.Profiles["auth"] + if p.Remote == nil || *p.Remote != "gar" { + t.Errorf("ProfileConfig.Remote: got %v, want gar", p.Remote) + } + if p.Stream == nil || *p.Stream != "shared/auth" { + t.Errorf("ProfileConfig.Stream: got %v, want shared/auth", p.Stream) + } + + name := "auth" + rp, err := cfg.ResolveProfile(nil, nil, &name, "/tmp/demo") + if err != nil { + t.Fatal(err) + } + if rp.Remote == nil || *rp.Remote != "gar" { + t.Errorf("ResolvedProfile.Remote: got %v, want gar", rp.Remote) + } + if rp.Stream() != "shared/auth" { + t.Errorf("Stream() override: got %q, want shared/auth", rp.Stream()) + } + // the override must not leak into the local key + if k := rp.SnapshotKey(); k != (history.SnapshotKey{ProjectID: "demo", DatabaseID: "auth"}) { + t.Errorf("stream override perturbed SnapshotKey: %+v", k) + } +} + +// TestStreamDefault: with no `stream` set, Stream() must reproduce +// history.StreamSuffix(key) byte-for-byte — the default / +// layout that BundleDir already uses, so anything already pushed keeps +// resolving. An empty-string `stream` is treated as "unset", not as a literal +// empty repo path. +func TestStreamDefault(t *testing.T) { + cfg, err := Parse(` +[project] +id = "demo" + +[profiles.auth] +db_url = "postgres://auth/x" +database_id = "auth" + +[profiles.empty] +db_url = "postgres://e/x" +database_id = "e" +stream = "" +`) + if err != nil { + t.Fatal(err) + } + + name := "auth" + rp, err := cfg.ResolveProfile(nil, nil, &name, "/tmp/demo") + if err != nil { + t.Fatal(err) + } + want := history.StreamSuffix(rp.SnapshotKey()) + if want != "demo/auth" { + t.Fatalf("precondition: StreamSuffix got %q, want demo/auth", want) + } + if rp.Stream() != want { + t.Errorf("default Stream(): got %q, want %q", rp.Stream(), want) + } + + empty := "empty" + rp, err = cfg.ResolveProfile(nil, nil, &empty, "/tmp/demo") + if err != nil { + t.Fatal(err) + } + if rp.Stream() != history.StreamSuffix(rp.SnapshotKey()) { + t.Errorf("empty stream should fall back to default, got %q", rp.Stream()) + } +} + +// TestResolveRemote walks the three-way selection rule: +// - explicit name → that remote (or a name-bearing error on a typo) +// - no name, single remote → the sole remote +// - no name, many remotes → the one marked default, else an error +// - no remotes at all → an error +func TestResolveRemote(t *testing.T) { + many, _ := Parse(` +[[remote]] +name = "gar" +ref = "us-docker.pkg.dev/p/dryrun" +default = true + +[[remote]] +name = "ghcr" +ref = "ghcr.io/org/dryrun" +`) + if r, err := many.ResolveRemote("ghcr"); err != nil || r.Name != "ghcr" { + t.Errorf("by name: got %v, %v", r, err) + } + if r, err := many.ResolveRemote(""); err != nil || r.Name != "gar" { + t.Errorf("sole default: got %v, %v", r, err) + } + if _, err := many.ResolveRemote("nope"); err == nil || !contains(err.Error(), "nope") { + t.Errorf("typo must name the remote: %v", err) + } + + one, _ := Parse(` +[[remote]] +name = "only" +ref = "ghcr.io/org/dryrun" +`) + if r, err := one.ResolveRemote(""); err != nil || r.Name != "only" { + t.Errorf("single remote: got %v, %v", r, err) + } + + ambiguous, _ := Parse(` +[[remote]] +name = "a" +ref = "ghcr.io/a" + +[[remote]] +name = "b" +ref = "ghcr.io/b" +`) + if _, err := ambiguous.ResolveRemote(""); err == nil { + t.Error("expected error for many remotes and no default") + } + + none, _ := Parse(`[project] +id = "x"`) + if _, err := none.ResolveRemote(""); err == nil { + t.Error("expected error when no remotes configured") + } +} + +// TestRemoteStreamBackwardsCompat: a config with none of the new fields must +// resolve exactly as before — no Remote pinned, Stream() at the default. This +// pins the additive guarantee: existing dryrun.toml files keep their behavior. +func TestRemoteStreamBackwardsCompat(t *testing.T) { + cfg, err := Parse(` +[project] +id = "demo" + +[profiles.dev] +db_url = "postgres://dev/x" +`) + if err != nil { + t.Fatal(err) + } + if len(cfg.Remotes) != 0 { + t.Errorf("expected no remotes, got %d", len(cfg.Remotes)) + } + name := "dev" + rp, err := cfg.ResolveProfile(nil, nil, &name, "/tmp/demo") + if err != nil { + t.Fatal(err) + } + if rp.Remote != nil { + t.Errorf("expected nil Remote, got %q", *rp.Remote) + } + if rp.Stream() != history.StreamSuffix(rp.SnapshotKey()) { + t.Errorf("Stream() should equal the default, got %q", rp.Stream()) + } +} + func TestLintConfigFromConventions(t *testing.T) { toml := ` [conventions] diff --git a/internal/history/bundle_codec.go b/internal/history/bundle_codec.go new file mode 100644 index 0000000..8555762 --- /dev/null +++ b/internal/history/bundle_codec.go @@ -0,0 +1,45 @@ +package history + +import ( + "encoding/json" + "fmt" + + "github.com/klauspost/compress/zstd" + + "github.com/boringsql/dryrun/internal/schema" +) + +// wire format is JSON+zstd; keep encode/decode here so every backend emits identical bytes +func EncodeBundle(b *Bundle) ([]byte, error) { + raw, err := json.Marshal(b) + if err != nil { + return nil, err + } + enc, err := zstd.NewWriter(nil) + if err != nil { + return nil, err + } + defer enc.Close() + return enc.EncodeAll(raw, nil), nil +} + +// normalize nil Activity to empty so callers can index without a nil check +func DecodeBundle(raw []byte) (*Bundle, error) { + dec, err := zstd.NewReader(nil) + if err != nil { + return nil, err + } + defer dec.Close() + plain, err := dec.DecodeAll(raw, nil) + if err != nil { + return nil, fmt.Errorf("decompress bundle: %w", err) + } + var b Bundle + if err := json.Unmarshal(plain, &b); err != nil { + return nil, fmt.Errorf("parse bundle: %w", err) + } + if b.Activity == nil { + b.Activity = map[string]*schema.ActivityStatsSnapshot{} + } + return &b, nil +} diff --git a/internal/history/bundle_codec_test.go b/internal/history/bundle_codec_test.go new file mode 100644 index 0000000..3aed8a9 --- /dev/null +++ b/internal/history/bundle_codec_test.go @@ -0,0 +1,129 @@ +package history + +import ( + "reflect" + "testing" + + "github.com/boringsql/dryrun/internal/schema" +) + +// The bundle codec is the single chokepoint where a Bundle becomes bytes and +// back again. Every backend (filesystem today, OCI next) leans on it to produce +// byte-identical wire output, so the contract we actually care about is simple +// but load-bearing: whatever goes into EncodeBundle must come back out of +// DecodeBundle structurally unchanged, and the same input must always produce +// the same bytes. These tests pin that contract down. + +// fullBundle assembles a Bundle carrying all three snapshot kinds, with two +// distinct activity nodes so the map-valued Activity field is exercised with +// more than one entry — single-entry maps have a way of hiding ordering and +// key-handling bugs. We reuse the package's existing fixtures so the shapes +// stay in lockstep with the rest of the store tests. +func fullBundle() *Bundle { + return &Bundle{ + Schema: testSnapshot("schema-hash-abc123", "appdb"), + Planner: plannerFixture("schema-hash-abc123", "planner-hash-def456", "appdb"), + Activity: map[string]*schema.ActivityStatsSnapshot{ + "primary": activityFixture("schema-hash-abc123", "activity-hash-primary", "primary", false), + "replica": activityFixture("schema-hash-abc123", "activity-hash-replica", "replica", true), + }, + } +} + +// Full round-trip: a Bundle carrying schema + planner + two activity nodes +// should survive marshal -> zstd -> unzstd -> unmarshal and come back +// deep-equal to what we started with. If JSON tags drift, a field stops being +// exported, or compression mangles something, this is the test that screams. +func TestBundleCodec_RoundTrip(t *testing.T) { + original := fullBundle() + + raw, err := EncodeBundle(original) + if err != nil { + t.Fatalf("encode bundle: %v", err) + } + if len(raw) == 0 { + t.Fatal("encode produced zero bytes") + } + + decoded, err := DecodeBundle(raw) + if err != nil { + t.Fatalf("decode bundle: %v", err) + } + + // reflect.DeepEqual is the right hammer here: the fixtures build their + // time.Time values via time.Now().UTC(), which carries no monotonic clock + // reading and lives in the UTC location, so a JSON round-trip (RFC3339Nano, + // also UTC) reproduces them field-for-field. No timezone or monotonic skew + // to trip the comparison. + if !reflect.DeepEqual(original, decoded) { + t.Errorf("round-trip mismatch:\n original = %+v\n decoded = %+v", original, decoded) + } +} + +// Encoding the same bundle twice must yield byte-identical output. This is the +// property that makes registry-side dedup work for OCIStore: the layer digest +// is computed over these bytes, so any nondeterminism (map iteration leaking +// into output, compression dictionary drift) would silently defeat dedup by +// producing a "new" blob for unchanged content. The two-activity-node bundle is +// the important case — Go randomizes map iteration, so an encoder that walked +// the map without a stable key order would diverge here. +func TestBundleCodec_Deterministic(t *testing.T) { + b := fullBundle() + + first, err := EncodeBundle(b) + if err != nil { + t.Fatalf("first encode: %v", err) + } + second, err := EncodeBundle(b) + if err != nil { + t.Fatalf("second encode: %v", err) + } + if !reflect.DeepEqual(first, second) { + t.Errorf("encode is nondeterministic: two encodings of the same bundle differ (%d vs %d bytes)", len(first), len(second)) + } +} + +// DecodeBundle promises that a nil Activity map comes back as an empty-but- +// non-nil map. Callers index Activity directly (b.Activity[node] = ...) during +// the planner/activity merge, so a nil map would panic on write. We encode a +// schema-only bundle (no activity at all) and assert the normalization kicked +// in on the way back out. +func TestBundleCodec_NilActivityNormalized(t *testing.T) { + b := &Bundle{ + Schema: testSnapshot("schema-only", "appdb"), + // Activity intentionally left nil. + } + + raw, err := EncodeBundle(b) + if err != nil { + t.Fatalf("encode: %v", err) + } + decoded, err := DecodeBundle(raw) + if err != nil { + t.Fatalf("decode: %v", err) + } + + if decoded.Activity == nil { + t.Fatal("decoded Activity is nil; expected an empty non-nil map so callers can index without a nil check") + } + if len(decoded.Activity) != 0 { + t.Errorf("decoded Activity should be empty, got %d entries", len(decoded.Activity)) + } +} + +// Garbage in must not look like success. DecodeBundle first runs the bytes +// through the zstd reader, so a payload that isn't a valid zstd frame should +// surface a clean decompression error rather than a panic or a zero-value +// Bundle masquerading as real data. This is the on-disk / on-registry +// corruption guard at the codec layer. +func TestBundleCodec_DecodeGarbage(t *testing.T) { + if _, err := DecodeBundle([]byte("this is definitely not a zstd frame")); err == nil { + t.Fatal("decoding garbage bytes succeeded; expected an error") + } + + // An empty input is its own little edge case — no frame at all — and should + // likewise error rather than yielding a usable bundle. + if _, err := DecodeBundle(nil); err == nil { + t.Fatal("decoding nil bytes succeeded; expected an error") + } +} diff --git a/internal/history/filesystem_layout.go b/internal/history/filesystem_layout.go index 8cb14a4..257e421 100644 --- a/internal/history/filesystem_layout.go +++ b/internal/history/filesystem_layout.go @@ -2,6 +2,7 @@ package history import ( "fmt" + "path" "path/filepath" "strings" "time" @@ -13,8 +14,13 @@ const ( bundleExtension = ".json.zst" ) +// shared by BundleDir and the default remote stream so the two can't drift +func StreamSuffix(key SnapshotKey) string { + return path.Join(string(key.ProjectID), string(key.DatabaseID)) +} + func BundleDir(root string, key SnapshotKey) string { - return filepath.Join(root, string(key.ProjectID), string(key.DatabaseID)) + return filepath.Join(root, filepath.FromSlash(StreamSuffix(key))) } func BundleFilename(ts time.Time, contentHash string) string { diff --git a/internal/history/filesystem_store.go b/internal/history/filesystem_store.go index 0a18fd6..d942257 100644 --- a/internal/history/filesystem_store.go +++ b/internal/history/filesystem_store.go @@ -2,7 +2,6 @@ package history import ( "context" - "encoding/json" "errors" "fmt" "io/fs" @@ -12,8 +11,6 @@ import ( "sync" "time" - "github.com/klauspost/compress/zstd" - "github.com/boringsql/dryrun/internal/schema" ) @@ -170,48 +167,55 @@ func (f *FilesystemStore) List(ctx context.Context, key SnapshotKey, kind Snapsh var out []SnapshotSummary for _, b := range bundles { - switch kind.Tag { - case KindSchema: - s := b.Schema - if !inRange(s.Timestamp, rng) { - continue - } + ss, err := bundleSummaries(b, kind, rng) + if err != nil { + return nil, err + } + out = append(out, ss...) + } + sort.Slice(out, func(i, j int) bool { return out[i].Timestamp.After(out[j].Timestamp) }) + return out, nil +} + +// per-bundle summaries for one kind; shared by FilesystemStore and OCIStore so +// the two backends report identically +func bundleSummaries(b *Bundle, kind SnapshotKind, rng TimeRange) ([]SnapshotSummary, error) { + var out []SnapshotSummary + switch kind.Tag { + case KindSchema: + s := b.Schema + if inRange(s.Timestamp, rng) { out = append(out, SnapshotSummary{ Kind: SchemaKind(), Timestamp: s.Timestamp, ContentHash: s.ContentHash, SchemaRefHash: s.ContentHash, Database: s.Database, }) - case KindPlanner: - if b.Planner == nil { - continue - } - if !inRange(b.Planner.Timestamp, rng) { - continue - } + } + case KindPlanner: + if b.Planner != nil && inRange(b.Planner.Timestamp, rng) { out = append(out, SnapshotSummary{ Kind: PlannerKind(), Timestamp: b.Planner.Timestamp, ContentHash: b.Planner.ContentHash, SchemaRefHash: b.Planner.SchemaRefHash, Database: b.Planner.Database, }) - case KindActivity: - for label, a := range b.Activity { - if kind.NodeLabel != "" && kind.NodeLabel != label { - continue - } - if !inRange(a.Node.Timestamp, rng) { - continue - } - out = append(out, SnapshotSummary{ - Kind: ActivityKind(label), Timestamp: a.Node.Timestamp, - ContentHash: a.ContentHash, SchemaRefHash: a.SchemaRefHash, - NodeLabel: label, - }) + } + case KindActivity: + for label, a := range b.Activity { + if kind.NodeLabel != "" && kind.NodeLabel != label { + continue } - default: - return nil, fmt.Errorf("unknown SnapshotKind tag: %d", kind.Tag) + if !inRange(a.Node.Timestamp, rng) { + continue + } + out = append(out, SnapshotSummary{ + Kind: ActivityKind(label), Timestamp: a.Node.Timestamp, + ContentHash: a.ContentHash, SchemaRefHash: a.SchemaRefHash, + NodeLabel: label, + }) } + default: + return nil, fmt.Errorf("unknown SnapshotKind tag: %d", kind.Tag) } - sort.Slice(out, func(i, j int) bool { return out[i].Timestamp.After(out[j].Timestamp) }) return out, nil } @@ -288,7 +292,10 @@ func (f *FilesystemStore) ListKinds(ctx context.Context, key SnapshotKey) ([]Sna if err != nil { return nil, err } + return bundleKinds(bundles), nil +} +func bundleKinds(bundles []*Bundle) []SnapshotKind { var hasSchema, hasPlanner bool labels := map[string]struct{}{} for _, b := range bundles { @@ -318,7 +325,7 @@ func (f *FilesystemStore) ListKinds(ctx context.Context, key SnapshotKey) ([]Sna for _, label := range sortedLabels { out = append(out, ActivityKind(label)) } - return out, nil + return out } func (f *FilesystemStore) ListKeys(_ context.Context) ([]SnapshotKey, error) { @@ -550,36 +557,18 @@ func readBundle(path string) (*Bundle, error) { if err != nil { return nil, err } - dec, err := zstd.NewReader(nil) - if err != nil { - return nil, err - } - defer dec.Close() - plain, err := dec.DecodeAll(raw, nil) + b, err := DecodeBundle(raw) if err != nil { - return nil, fmt.Errorf("decompress bundle %s: %w", path, err) - } - var b Bundle - if err := json.Unmarshal(plain, &b); err != nil { - return nil, fmt.Errorf("parse bundle %s: %w", path, err) + return nil, fmt.Errorf("%s: %w", path, err) } - if b.Activity == nil { - b.Activity = map[string]*schema.ActivityStatsSnapshot{} - } - return &b, nil + return b, nil } func writeBundleAtomic(path string, b *Bundle) error { - raw, err := json.Marshal(b) - if err != nil { - return err - } - enc, err := zstd.NewWriter(nil) + compressed, err := EncodeBundle(b) if err != nil { return err } - defer enc.Close() - compressed := enc.EncodeAll(raw, nil) dir := filepath.Dir(path) tmp, err := os.CreateTemp(dir, ".bundle-*.tmp") diff --git a/internal/history/oci_auth.go b/internal/history/oci_auth.go new file mode 100644 index 0000000..9d244a9 --- /dev/null +++ b/internal/history/oci_auth.go @@ -0,0 +1,51 @@ +package history + +import ( + "context" + "fmt" + "os" + + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/auth" + "oras.land/oras-go/v2/registry/remote/credentials" + "oras.land/oras-go/v2/registry/remote/retry" +) + +type AuthConfig struct { + TokenEnv string +} + +// token_env wins; else docker creds (covers GAR/GHCR/ECR/Hub after login) +func NewAuthClient(cfg AuthConfig) (remote.Client, error) { + if cfg.TokenEnv != "" { + token := os.Getenv(cfg.TokenEnv) + if token == "" { + return nil, fmt.Errorf("oci auth: token_env %q is set but the variable is empty", cfg.TokenEnv) + } + return staticBearerClient(token), nil + } + + store, err := credentials.NewStoreFromDocker(credentials.StoreOptions{}) + if err != nil { + return nil, fmt.Errorf("oci auth: load docker credentials: %w", err) + } + c := &auth.Client{ + Client: retry.DefaultClient, + Cache: auth.NewCache(), + Credential: credentials.Credential(store), + } + c.SetUserAgent("dryrun") + return c, nil +} + +func staticBearerClient(token string) *auth.Client { + c := &auth.Client{ + Client: retry.DefaultClient, + Cache: auth.NewCache(), + Credential: func(context.Context, string) (auth.Credential, error) { + return auth.Credential{AccessToken: token}, nil + }, + } + c.SetUserAgent("dryrun") + return c +} diff --git a/internal/history/oci_store.go b/internal/history/oci_store.go new file mode 100644 index 0000000..b9603cd --- /dev/null +++ b/internal/history/oci_store.go @@ -0,0 +1,467 @@ +package history + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sort" + "strings" + "time" + + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + oras "oras.land/oras-go/v2" + "oras.land/oras-go/v2/errdef" + "oras.land/oras-go/v2/registry/remote" + "oras.land/oras-go/v2/registry/remote/errcode" + + "github.com/boringsql/dryrun/internal/schema" +) + +const ( + mediaTypeBundle = "application/vnd.dryrun.bundle.v1+zstd" + artifactTypeSnapshot = "application/vnd.dryrun.snapshot.v1+json" +) + +// OCIStore persists each schema bundle as one OCI artifact (manifest + a single +// zstd layer). Planner/activity merge into the matching bundle by +// schema_ref_hash, mirroring FilesystemStore. +type ( + OCIStore struct { + base string + client remote.Client + plainHTTP bool + streamFor func(SnapshotKey) string + } + + OCIConfig struct { + Base string // registry + repo prefix, e.g. us-docker.pkg.dev/proj/dryrun + Client remote.Client + PlainHTTP bool + StreamFor func(SnapshotKey) string // default StreamSuffix + } + + ociBundle struct { + manifest ocispec.Descriptor + bundle *Bundle + } +) + +var _ SnapshotStore = (*OCIStore)(nil) + +func NewOCIStore(cfg OCIConfig) (*OCIStore, error) { + if cfg.Base == "" { + return nil, fmt.Errorf("oci store: empty base reference") + } + streamFor := cfg.StreamFor + if streamFor == nil { + streamFor = StreamSuffix + } + return &OCIStore{ + base: strings.TrimRight(cfg.Base, "/"), + client: cfg.Client, + plainHTTP: cfg.PlainHTTP, + streamFor: streamFor, + }, nil +} + +func (o *OCIStore) repo(key SnapshotKey) (*remote.Repository, error) { + ref := o.base + "/" + o.streamFor(key) + r, err := remote.NewRepository(ref) + if err != nil { + return nil, fmt.Errorf("oci store: bad reference %q: %w", ref, err) + } + r.Client = o.client + r.PlainHTTP = o.plainHTTP + return r, nil +} + +// version tag embeds ts+hash like the filesystem filename; ref tag is hash-only +// so planner/activity locate their schema bundle by schema_ref_hash alone +func versionTag(ts time.Time, contentHash string) string { + return fmt.Sprintf("%s-%s", ts.UTC().Format(bundleTimeLayout), contentHash) +} + +func refTag(schemaHash string) string { + return "ref-" + schemaHash +} + +func (o *OCIStore) Put(ctx context.Context, key SnapshotKey, snap StoredSnapshot) (PutOutcome, error) { + switch { + case snap.AsSchema() != nil: + return o.putSchema(ctx, key, snap.AsSchema()) + case snap.AsPlanner() != nil: + return o.putPlanner(ctx, key, snap.AsPlanner()) + case snap.AsActivity() != nil: + return o.putActivity(ctx, key, snap.AsActivity()) + } + return PutInserted, fmt.Errorf("empty StoredSnapshot") +} + +func (o *OCIStore) putSchema(ctx context.Context, key SnapshotKey, s *schema.SchemaSnapshot) (PutOutcome, error) { + repo, err := o.repo(key) + if err != nil { + return PutInserted, err + } + vtag := versionTag(s.Timestamp, s.ContentHash) + if _, ok, err := resolveTag(ctx, repo, vtag); err != nil { + return PutInserted, err + } else if ok { + return PutDeduped, nil + } + b := &Bundle{Schema: s, Activity: map[string]*schema.ActivityStatsSnapshot{}} + return o.pushTagged(ctx, repo, b) +} + +func (o *OCIStore) putPlanner(ctx context.Context, key SnapshotKey, p *schema.PlannerStatsSnapshot) (PutOutcome, error) { + repo, err := o.repo(key) + if err != nil { + return PutInserted, err + } + b, ok, err := o.findBySchemaRef(ctx, repo, p.SchemaRefHash) + if err != nil { + return PutInserted, err + } + if !ok { + return PutInserted, ErrOrphanSnapshot + } + if b.Planner != nil && b.Planner.ContentHash == p.ContentHash { + return PutDeduped, nil + } + b.Planner = p + return o.pushTagged(ctx, repo, b) +} + +func (o *OCIStore) putActivity(ctx context.Context, key SnapshotKey, a *schema.ActivityStatsSnapshot) (PutOutcome, error) { + repo, err := o.repo(key) + if err != nil { + return PutInserted, err + } + b, ok, err := o.findBySchemaRef(ctx, repo, a.SchemaRefHash) + if err != nil { + return PutInserted, err + } + if !ok { + return PutInserted, ErrOrphanSnapshot + } + if b.Activity == nil { + b.Activity = map[string]*schema.ActivityStatsSnapshot{} + } + if existing, ok := b.Activity[a.Node.Source]; ok && existing.ContentHash == a.ContentHash { + return PutDeduped, nil + } + b.Activity[a.Node.Source] = a + return o.pushTagged(ctx, repo, b) +} + +// merge re-pushes under the same (schema-keyed) tags; old manifest is left for +// registry cleanup +func (o *OCIStore) pushTagged(ctx context.Context, repo *remote.Repository, b *Bundle) (PutOutcome, error) { + man, err := o.pushBundle(ctx, repo, b) + if err != nil { + return PutInserted, err + } + if err := tagBundle(ctx, repo, man, b); err != nil { + return PutInserted, err + } + return PutInserted, nil +} + +func tagBundle(ctx context.Context, repo *remote.Repository, man ocispec.Descriptor, b *Bundle) error { + for _, t := range []string{ + versionTag(b.Schema.Timestamp, b.Schema.ContentHash), + refTag(b.Schema.ContentHash), + } { + if err := repo.Tag(ctx, man, t); err != nil { + return fmt.Errorf("oci store: tag %q: %w", t, err) + } + } + return nil +} + +func (o *OCIStore) pushBundle(ctx context.Context, repo *remote.Repository, b *Bundle) (ocispec.Descriptor, error) { + raw, err := EncodeBundle(b) + if err != nil { + return ocispec.Descriptor{}, err + } + layer := ocispec.Descriptor{ + MediaType: mediaTypeBundle, + Digest: digest.FromBytes(raw), + Size: int64(len(raw)), + } + if err := pushIfAbsent(ctx, repo, layer, raw); err != nil { + return ocispec.Descriptor{}, err + } + return oras.PackManifest(ctx, repo, oras.PackManifestVersion1_1, artifactTypeSnapshot, oras.PackManifestOptions{ + Layers: []ocispec.Descriptor{layer}, + // pin created to the snapshot ts so identical bundles pack to identical manifests + ManifestAnnotations: map[string]string{ocispec.AnnotationCreated: b.Schema.Timestamp.UTC().Format(time.RFC3339)}, + }) +} + +func (o *OCIStore) findBySchemaRef(ctx context.Context, repo *remote.Repository, schemaHash string) (*Bundle, bool, error) { + desc, ok, err := resolveTag(ctx, repo, refTag(schemaHash)) + if err != nil || !ok { + return nil, false, err + } + b, err := fetchBundle(ctx, repo, desc) + if err != nil { + return nil, false, err + } + return b, true, nil +} + +func pushIfAbsent(ctx context.Context, repo *remote.Repository, desc ocispec.Descriptor, data []byte) error { + ok, err := repo.Exists(ctx, desc) + if err != nil { + return err + } + if ok { + return nil + } + return repo.Push(ctx, desc, bytes.NewReader(data)) +} + +func resolveTag(ctx context.Context, repo *remote.Repository, tag string) (ocispec.Descriptor, bool, error) { + desc, err := repo.Resolve(ctx, tag) + if err != nil { + if errors.Is(err, errdef.ErrNotFound) { + return ocispec.Descriptor{}, false, nil + } + return ocispec.Descriptor{}, false, err + } + return desc, true, nil +} + +func fetchBundle(ctx context.Context, repo *remote.Repository, manifest ocispec.Descriptor) (*Bundle, error) { + mr, err := repo.Fetch(ctx, manifest) + if err != nil { + return nil, err + } + defer mr.Close() + mbytes, err := io.ReadAll(mr) + if err != nil { + return nil, err + } + var man ocispec.Manifest + if err := json.Unmarshal(mbytes, &man); err != nil { + return nil, fmt.Errorf("oci store: parse manifest: %w", err) + } + if len(man.Layers) == 0 { + return nil, fmt.Errorf("oci store: manifest has no layers") + } + lr, err := repo.Fetch(ctx, man.Layers[0]) + if err != nil { + return nil, err + } + defer lr.Close() + raw, err := io.ReadAll(lr) + if err != nil { + return nil, err + } + return DecodeBundle(raw) +} + +// inverse of versionTag; ref-* and other tags fail the time parse and are skipped +func parseVersionTag(tag string) (time.Time, string, bool) { + i := strings.IndexByte(tag, '-') + if i < 0 || i+1 >= len(tag) { + return time.Time{}, "", false + } + ts, err := time.Parse(bundleTimeLayout, tag[:i]) + if err != nil { + return time.Time{}, "", false + } + return ts, tag[i+1:], true +} + +// load fetches every version-tagged bundle newest-first, mirroring +// FilesystemStore.loadBundles so the pick*/summary helpers behave identically +func (o *OCIStore) load(ctx context.Context, key SnapshotKey) (*remote.Repository, []ociBundle, error) { + repo, err := o.repo(key) + if err != nil { + return nil, nil, err + } + var items []ociBundle + err = repo.Tags(ctx, "", func(tags []string) error { + for _, t := range tags { + if _, _, ok := parseVersionTag(t); !ok { + continue + } + desc, ok, err := resolveTag(ctx, repo, t) + if err != nil { + return err + } + if !ok { + continue + } + b, err := fetchBundle(ctx, repo, desc) + if err != nil { + return err + } + items = append(items, ociBundle{manifest: desc, bundle: b}) + } + return nil + }) + if err != nil { + // an absent repo (never pushed to) reads as empty, not an error + if isRepoAbsent(err) { + return repo, nil, nil + } + return nil, nil, err + } + sort.Slice(items, func(i, j int) bool { + return items[i].bundle.Schema.Timestamp.After(items[j].bundle.Schema.Timestamp) + }) + return repo, items, nil +} + +// a never-pushed repo answers tags/list with 404 NAME_UNKNOWN, not ErrNotFound +func isRepoAbsent(err error) bool { + if errors.Is(err, errdef.ErrNotFound) { + return true + } + var resp *errcode.ErrorResponse + return errors.As(err, &resp) && resp.StatusCode == http.StatusNotFound +} + +func (o *OCIStore) loadBundles(ctx context.Context, key SnapshotKey) ([]*Bundle, error) { + _, items, err := o.load(ctx, key) + if err != nil { + return nil, err + } + out := make([]*Bundle, len(items)) + for i, it := range items { + out[i] = it.bundle + } + return out, nil +} + +func (o *OCIStore) Get(ctx context.Context, key SnapshotKey, kind SnapshotKind, at SnapshotRef) (StoredSnapshot, error) { + bundles, err := o.loadBundles(ctx, key) + if err != nil { + return StoredSnapshot{}, err + } + switch kind.Tag { + case KindSchema: + b, err := pickSchemaBundle(bundles, at) + if err != nil { + return StoredSnapshot{}, err + } + return WrapSchema(b.Schema), nil + case KindPlanner: + b, err := pickPlannerBundle(bundles, at) + if err != nil { + return StoredSnapshot{}, err + } + return WrapPlanner(b.Planner), nil + case KindActivity: + a, err := pickActivity(bundles, kind.NodeLabel, at) + if err != nil { + return StoredSnapshot{}, err + } + return WrapActivity(a), nil + } + return StoredSnapshot{}, fmt.Errorf("unknown SnapshotKind tag: %d", kind.Tag) +} + +func (o *OCIStore) List(ctx context.Context, key SnapshotKey, kind SnapshotKind, rng TimeRange) ([]SnapshotSummary, error) { + bundles, err := o.loadBundles(ctx, key) + if err != nil { + return nil, err + } + var out []SnapshotSummary + for _, b := range bundles { + ss, err := bundleSummaries(b, kind, rng) + if err != nil { + return nil, err + } + out = append(out, ss...) + } + sort.Slice(out, func(i, j int) bool { return out[i].Timestamp.After(out[j].Timestamp) }) + return out, nil +} + +func (o *OCIStore) Latest(ctx context.Context, key SnapshotKey, kind SnapshotKind) (*SnapshotSummary, error) { + list, err := o.List(ctx, key, kind, TimeRange{}) + if err != nil || len(list) == 0 { + return nil, err + } + first := list[0] + return &first, nil +} + +func (o *OCIStore) DeleteBefore(ctx context.Context, key SnapshotKey, kind SnapshotKind, cutoff time.Time) (int64, error) { + if kind.Tag != KindSchema { + return 0, fmt.Errorf("oci store: DeleteBefore supports schema only, got %s", kind) + } + repo, items, err := o.load(ctx, key) + if err != nil { + return 0, err + } + var n int64 + for _, it := range items { + if it.bundle.Schema != nil && it.bundle.Schema.Timestamp.Before(cutoff) { + if err := repo.Delete(ctx, it.manifest); err != nil { + return n, err + } + n++ + } + } + return n, nil +} + +func (o *OCIStore) ListKinds(ctx context.Context, key SnapshotKey) ([]SnapshotKind, error) { + bundles, err := o.loadBundles(ctx, key) + if err != nil { + return nil, err + } + return bundleKinds(bundles), nil +} + +func (o *OCIStore) ListKeys(ctx context.Context) ([]SnapshotKey, error) { + host, prefix, ok := strings.Cut(o.base, "/") + if !ok { + return nil, fmt.Errorf("oci store: base %q has no repo path", o.base) + } + reg, err := remote.NewRegistry(host) + if err != nil { + return nil, err + } + reg.Client = o.client + reg.PlainHTTP = o.plainHTTP + + prefix += "/" + var out []SnapshotKey + err = reg.Repositories(ctx, "", func(repos []string) error { + for _, r := range repos { + suffix, ok := strings.CutPrefix(r, prefix) + if !ok { + continue + } + proj, db, ok := strings.Cut(suffix, "/") + if !ok || strings.Contains(db, "/") { + continue + } + out = append(out, SnapshotKey{ProjectID: ProjectId(proj), DatabaseID: DatabaseId(db)}) + } + return nil + }) + if err != nil { + if errors.Is(err, errdef.ErrNotFound) { + return nil, nil + } + return nil, err + } + sort.Slice(out, func(i, j int) bool { + if out[i].ProjectID != out[j].ProjectID { + return out[i].ProjectID < out[j].ProjectID + } + return out[i].DatabaseID < out[j].DatabaseID + }) + return out, nil +} diff --git a/internal/history/oci_store_integration_test.go b/internal/history/oci_store_integration_test.go new file mode 100644 index 0000000..3002219 --- /dev/null +++ b/internal/history/oci_store_integration_test.go @@ -0,0 +1,280 @@ +//go:build integration + +package history + +import ( + "context" + "fmt" + "os" + "testing" + "time" +) + +// This is the registry-backed half of the OCIStore tests. It only runs under +// `go test -tags integration` AND when DRYRUN_TEST_REGISTRY points at a live, +// push-able OCI registry (e.g. a local `registry:2` or `zot` over plain HTTP): +// +// docker run -d -p 5000:5000 registry:2 +// DRYRUN_TEST_REGISTRY=localhost:5000 go test -tags integration ./internal/history/ +// +// Without that env var the whole file skips, so CI that hasn't provisioned a +// registry stays green. We talk to the registry anonymously over plain HTTP: +// OCIStore leaves remote.Repository.Client nil, which makes oras-go fall back +// to its default (anonymous) client, exactly what an unauthenticated local +// registry wants. + +// newIntegrationStore builds an OCIStore against the test registry, rooted at a +// per-run-unique repo prefix so repeated runs (and parallel packages) never +// collide on tags. The uniqueness comes from a nanosecond stamp baked into the +// project segment of every key's stream — see uniqueKey below. +func newIntegrationStore(t *testing.T) *OCIStore { + t.Helper() + addr := os.Getenv("DRYRUN_TEST_REGISTRY") + if addr == "" { + t.Skip("DRYRUN_TEST_REGISTRY not set; skipping OCI registry integration test") + } + store, err := NewOCIStore(OCIConfig{ + Base: addr + "/dryrun-test", + PlainHTTP: true, // local registry:2 / zot listen on http, not https + }) + if err != nil { + t.Fatalf("NewOCIStore: %v", err) + } + return store +} + +// uniqueKey namespaces a (project, database) pair under a per-run stamp so the +// registry repo path is fresh every run. Reusing a path across runs would let a +// previous run's tags leak into this run's List/Latest assertions. +func uniqueKey(stamp int64, project, database string) SnapshotKey { + return key(fmt.Sprintf("run-%d-%s", stamp, project), database) +} + +// TestOCIStoreConformance drives the same put/get/list/latest/delete/dedup +// contract the SQLite and filesystem stores are held to, but against a real +// registry. If OCIStore's tag scheme, manifest annotations, or merge logic +// diverge from the shared SnapshotStore semantics, one of these sub-tests +// breaks. +func TestOCIStoreConformance(t *testing.T) { + store := newIntegrationStore(t) + ctx := context.Background() + stamp := time.Now().UnixNano() + + t.Run("PutGetSchema", func(t *testing.T) { + k := uniqueKey(stamp, "acme", "primary") + s := testSnapshot("sh-1", "appdb") + if o, err := store.Put(ctx, k, WrapSchema(s)); err != nil || o != PutInserted { + t.Fatalf("put schema: got (%v, %v), want PutInserted", o, err) + } + got, err := store.Get(ctx, k, SchemaKind(), NewRefHash("sh-1")) + if err != nil || got.AsSchema() == nil || got.AsSchema().ContentHash != "sh-1" { + t.Fatalf("get schema: got %+v err=%v", got.AsSchema(), err) + } + }) + + t.Run("SchemaDedup", func(t *testing.T) { + k := uniqueKey(stamp, "dedup", "primary") + s := testSnapshot("dup-hash", "appdb") + if o, err := store.Put(ctx, k, WrapSchema(s)); err != nil || o != PutInserted { + t.Fatalf("first put: got (%v, %v)", o, err) + } + // Same content hash a second time must short-circuit to PutDeduped — the + // version tag already resolves, so no new manifest is pushed. + if o, err := store.Put(ctx, k, WrapSchema(s)); err != nil || o != PutDeduped { + t.Fatalf("second put: got (%v, %v), want PutDeduped", o, err) + } + }) + + t.Run("PlannerActivityMergeIntoBundle", func(t *testing.T) { + k := uniqueKey(stamp, "merge", "primary") + s := testSnapshot("sh-merge", "appdb") + if _, err := store.Put(ctx, k, WrapSchema(s)); err != nil { + t.Fatalf("put schema: %v", err) + } + // planner and activity locate the schema bundle by schema_ref_hash, + // pull-merge-repush; both must then be retrievable from the same stream. + p := plannerFixture("sh-merge", "pl-1", "appdb") + if _, err := store.Put(ctx, k, WrapPlanner(p)); err != nil { + t.Fatalf("put planner: %v", err) + } + a := activityFixture("sh-merge", "ac-1", "primary", false) + if _, err := store.Put(ctx, k, WrapActivity(a)); err != nil { + t.Fatalf("put activity: %v", err) + } + + gotP, err := store.Get(ctx, k, PlannerKind(), NewRefHash("pl-1")) + if err != nil || gotP.AsPlanner().ContentHash != "pl-1" { + t.Errorf("get planner: got %+v err=%v", gotP.AsPlanner(), err) + } + gotA, err := store.Get(ctx, k, ActivityKind("primary"), NewRefHash("ac-1")) + if err != nil || gotA.AsActivity().ContentHash != "ac-1" { + t.Errorf("get activity: got %+v err=%v", gotA.AsActivity(), err) + } + }) + + t.Run("OrphanPlannerRejected", func(t *testing.T) { + k := uniqueKey(stamp, "orphan", "primary") + // No schema bundle pushed first, so the planner has nothing to bind to. + p := plannerFixture("no-such-schema", "pl-x", "appdb") + if _, err := store.Put(ctx, k, WrapPlanner(p)); err == nil { + t.Error("put orphan planner: want error, got nil") + } + }) + + t.Run("ListLatestAndTimeRange", func(t *testing.T) { + k := uniqueKey(stamp, "history", "primary") + now := time.Now().UTC().Truncate(time.Second) + mk := func(hash string, offset time.Duration) { + s := testSnapshot(hash, "appdb") + s.Timestamp = now.Add(offset) + if _, err := store.Put(ctx, k, WrapSchema(s)); err != nil { + t.Fatalf("put %s: %v", hash, err) + } + } + mk("h-old", -2*time.Hour) + mk("h-mid", -1*time.Hour) + mk("h-new", 0) + + list, err := store.List(ctx, k, SchemaKind(), TimeRange{}) + if err != nil { + t.Fatalf("list: %v", err) + } + if len(list) != 3 { + t.Fatalf("list len: got %d, want 3", len(list)) + } + // List is newest-first, same as every other backend. + if list[0].ContentHash != "h-new" { + t.Errorf("list[0]: got %q, want h-new", list[0].ContentHash) + } + + latest, err := store.Latest(ctx, k, SchemaKind()) + if err != nil || latest == nil || latest.ContentHash != "h-new" { + t.Fatalf("latest: got %+v err=%v, want h-new", latest, err) + } + + // half-open window [mid-ε, new): should surface exactly h-mid. + from := now.Add(-time.Hour - time.Minute) + to := now.Add(-30 * time.Minute) + windowed, err := store.List(ctx, k, SchemaKind(), TimeRange{From: &from, To: &to}) + if err != nil { + t.Fatalf("windowed list: %v", err) + } + if len(windowed) != 1 || windowed[0].ContentHash != "h-mid" { + t.Errorf("windowed list: got %+v, want [h-mid]", windowed) + } + }) + + t.Run("DeleteBeforeCutoff", func(t *testing.T) { + k := uniqueKey(stamp, "retention", "primary") + now := time.Now().UTC().Truncate(time.Second) + old := testSnapshot("h-old", "appdb") + old.Timestamp = now.Add(-24 * time.Hour) + fresh := testSnapshot("h-new", "appdb") + fresh.Timestamp = now + if _, err := store.Put(ctx, k, WrapSchema(old)); err != nil { + t.Fatal(err) + } + if _, err := store.Put(ctx, k, WrapSchema(fresh)); err != nil { + t.Fatal(err) + } + + deleted, err := store.DeleteBefore(ctx, k, SchemaKind(), now.Add(-time.Hour)) + if err != nil { + t.Fatalf("delete before: %v", err) + } + if deleted != 1 { + t.Errorf("deleted: got %d, want 1", deleted) + } + list, err := store.List(ctx, k, SchemaKind(), TimeRange{}) + if err != nil { + t.Fatal(err) + } + if len(list) != 1 || list[0].ContentHash != "h-new" { + t.Errorf("survivors: got %+v, want [h-new]", list) + } + }) + + t.Run("KeyIsolation", func(t *testing.T) { + // Identical content under two different streams must not dedup against + // each other — each stream is its own repository. + k1 := uniqueKey(stamp, "iso", "primary") + k2 := uniqueKey(stamp, "iso", "replica") + s := testSnapshot("same-hash", "appdb") + if _, err := store.Put(ctx, k1, WrapSchema(s)); err != nil { + t.Fatal(err) + } + if o, err := store.Put(ctx, k2, WrapSchema(s)); err != nil || o != PutInserted { + t.Fatalf("put under second key: got (%v, %v), want PutInserted", o, err) + } + for _, k := range []SnapshotKey{k1, k2} { + list, err := store.List(ctx, k, SchemaKind(), TimeRange{}) + if err != nil { + t.Fatal(err) + } + if len(list) != 1 { + t.Errorf("key %+v: got %d rows, want 1", k, len(list)) + } + } + }) +} + +// TestOCIStoreCrossStoreParity is the round-trip that proves the OCI backend is +// wire-compatible with the filesystem one: seed a FilesystemStore, sync each +// kind into OCIStore, sync back into a second FilesystemStore, and assert the +// content hashes survive unchanged. content_hash is the identity of a snapshot, +// so equal hashes end-to-end means the bytes never drifted in transit. +func TestOCIStoreCrossStoreParity(t *testing.T) { + oci := newIntegrationStore(t) + ctx := context.Background() + stamp := time.Now().UnixNano() + k := uniqueKey(stamp, "parity", "primary") + + src, _ := testFsStore(t) + dst, _ := testFsStore(t) + + // Seed the source filesystem store with one of each kind, all bound to the + // same schema_ref_hash so planner/activity have a schema to merge into. + s := testSnapshot("sh-parity", "appdb") + p := plannerFixture("sh-parity", "pl-parity", "appdb") + a := activityFixture("sh-parity", "ac-parity", "primary", false) + for _, snap := range []StoredSnapshot{WrapSchema(s), WrapPlanner(p), WrapActivity(a)} { + if _, err := src.Put(ctx, k, snap); err != nil { + t.Fatalf("seed src: %v", err) + } + } + + // Hop: filesystem -> OCI -> filesystem. Schema must lead so the merges land. + copyKind := func(from, to SnapshotStore, kind SnapshotKind, ref SnapshotRef) { + got, err := from.Get(ctx, k, kind, ref) + if err != nil { + t.Fatalf("get %v from source: %v", kind, err) + } + if _, err := to.Put(ctx, k, got); err != nil { + t.Fatalf("put %v into dest: %v", kind, err) + } + } + for _, hop := range []struct { + from, to SnapshotStore + }{ + {src, oci}, + {oci, dst}, + } { + copyKind(hop.from, hop.to, SchemaKind(), NewRefHash("sh-parity")) + copyKind(hop.from, hop.to, PlannerKind(), NewRefHash("pl-parity")) + copyKind(hop.from, hop.to, ActivityKind("primary"), NewRefHash("ac-parity")) + } + + // The hashes that came out the far end must equal what we put in. + gotS, err := dst.Get(ctx, k, SchemaKind(), NewRefHash("sh-parity")) + if err != nil || gotS.AsSchema().ContentHash != s.ContentHash { + t.Errorf("schema parity: got %+v err=%v, want hash %s", gotS.AsSchema(), err, s.ContentHash) + } + gotP, err := dst.Get(ctx, k, PlannerKind(), NewRefHash("pl-parity")) + if err != nil || gotP.AsPlanner().ContentHash != p.ContentHash { + t.Errorf("planner parity: got %+v err=%v, want hash %s", gotP.AsPlanner(), err, p.ContentHash) + } + gotA, err := dst.Get(ctx, k, ActivityKind("primary"), NewRefHash("ac-parity")) + if err != nil || gotA.AsActivity().ContentHash != a.ContentHash { + t.Errorf("activity parity: got %+v err=%v, want hash %s", gotA.AsActivity(), err, a.ContentHash) + } +} diff --git a/internal/history/oci_store_test.go b/internal/history/oci_store_test.go new file mode 100644 index 0000000..4e59aa7 --- /dev/null +++ b/internal/history/oci_store_test.go @@ -0,0 +1,117 @@ +package history + +import ( + "testing" + "time" +) + +// These are the registry-free unit tests for OCIStore: the pure string/tag +// plumbing and constructor wiring we can exercise without standing up an actual +// OCI registry. The full put/get/list conformance suite lives in +// oci_store_integration_test.go behind the `integration` build tag, because it +// genuinely needs a registry to talk to (OCIStore drives a *remote.Repository, +// not an injectable in-memory target). Keeping the fast, hermetic checks here +// means a plain `go test ./internal/history/` still covers the tag scheme and +// repo-path mapping that everything else is built on top of. + +// The version tag is how a snapshot is named in the registry, and List/Latest +// reconstruct timestamps by parsing it back. So versionTag and parseVersionTag +// have to be exact inverses — encode a (timestamp, hash) pair and we must get +// the same pair back out, with the timestamp landing on the dot in UTC. +func TestOCIVersionTag_RoundTrip(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2024-06-01T12:30:45Z") + if err != nil { + t.Fatalf("parse timestamp: %v", err) + } + hash := "abc123def456" + + tag := versionTag(ts, hash) + + gotTS, gotHash, ok := parseVersionTag(tag) + if !ok { + t.Fatalf("parseVersionTag(%q) failed to parse a tag we just produced", tag) + } + if !gotTS.Equal(ts) { + t.Errorf("timestamp round-trip: got %v, want %v", gotTS, ts) + } + if gotHash != hash { + t.Errorf("hash round-trip: got %q, want %q", gotHash, hash) + } +} + +// parseVersionTag is also the filter that decides which tags in a repo are real +// snapshots. A repo holds version tags AND the hash-only `ref-` pointer +// tags used to locate a schema bundle during planner/activity merges — plus +// whatever junk a registry might surface. Only well-formed `-` tags +// should parse; everything else must be rejected so it gets skipped during a +// list rather than crashing or inventing a zero-time snapshot. +func TestOCIParseVersionTag_RejectsNonVersionTags(t *testing.T) { + cases := []struct { + name string + tag string + }{ + {"ref pointer tag", refTag("abc123")}, // ref-abc123: "ref" is not a timestamp + {"moving latest tag", "latest"}, // no dash at all + {"empty string", ""}, // nothing to parse + {"dash with no hash", "20240601T123000Z-"}, // valid ts but empty hash half + {"garbage before dash", "notatime-abc"}, // has a dash, but the left half isn't a timestamp + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if _, _, ok := parseVersionTag(tc.tag); ok { + t.Errorf("parseVersionTag(%q) parsed successfully; expected it to be rejected", tc.tag) + } + }) + } +} + +// NewOCIStore is the wiring seam: it must reject a missing base outright (an +// empty registry reference is never recoverable) and, when the caller doesn't +// supply a stream mapper, default to StreamSuffix so the OCI repo path lines up +// with the on-disk BundleDir layout. It should also tolerate a trailing slash +// on the base rather than producing a `//` in the final ref. +func TestNewOCIStore_DefaultsAndValidation(t *testing.T) { + // Empty base is a hard error — there's nothing sensible to point at. + if _, err := NewOCIStore(OCIConfig{}); err == nil { + t.Fatal("NewOCIStore with empty Base succeeded; expected an error") + } + + // A base with a trailing slash should be normalized, not passed through. + store, err := NewOCIStore(OCIConfig{Base: "reg.example.com/proj/dryrun/"}) + if err != nil { + t.Fatalf("NewOCIStore: %v", err) + } + if store.base != "reg.example.com/proj/dryrun" { + t.Errorf("base not trimmed: got %q", store.base) + } + + // With no StreamFor supplied, the store must fall back to StreamSuffix so + // the default remote layout matches the shared local definition exactly. + k := key("acme", "primary") + if got, want := store.streamFor(k), StreamSuffix(k); got != want { + t.Errorf("default streamFor: got %q, want %q (should match StreamSuffix)", got, want) + } +} + +// repo() composes the full registry reference from base + stream. It builds a +// remote.Repository, which parses (but does not contact) the reference, so this +// stays hermetic. We assert the composed reference is what we expect and that a +// custom StreamFor flows through instead of the default — that override is what +// makes the shared-stream feature (two projects -> one repo) work. +func TestOCIStore_RepoReference(t *testing.T) { + store, err := NewOCIStore(OCIConfig{ + Base: "reg.example.com/proj/dryrun", + StreamFor: func(SnapshotKey) string { return "shared/auth" }, + }) + if err != nil { + t.Fatalf("NewOCIStore: %v", err) + } + + repo, err := store.repo(key("ignored", "ignored")) + if err != nil { + t.Fatalf("repo(): %v", err) + } + if got, want := repo.Reference.String(), "reg.example.com/proj/dryrun/shared/auth"; got != want { + t.Errorf("repo reference: got %q, want %q", got, want) + } +}