Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions internal/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,72 @@ func StreamTo(ctx context.Context, w io.Writer, directory string, excludePattern
return errors.Join(errs...)
}

// CreateSubdir archives a single named subdirectory within parentDir using tar+zstd,
// producing an archive whose paths start with "./{subdir}/...". This allows the
// client to extract the archive into the parent-equivalent directory and have the
// subdirectory land in the correct location.
//
// Example: CreateSubdir(ctx, cache, key, "/repo/.git", "lfs", ttl) creates an
// archive with paths like "./lfs/objects/xx/yy/sha256". The client extracts it
// into the ".git/" directory to get ".git/lfs/objects/xx/yy/sha256".
func CreateSubdir(ctx context.Context, remote cache.Cache, key cache.Key, parentDir, subdir string, ttl time.Duration) error {
subdirPath := filepath.Join(parentDir, subdir)
if info, err := os.Stat(subdirPath); err != nil {
return errors.Wrap(err, "failed to stat subdir")
} else if !info.IsDir() {
return errors.Errorf("not a directory: %s", subdirPath)
}

headers := make(http.Header)
headers.Set("Content-Type", "application/zstd")
headers.Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", subdir+".tar.zst"))

wc, err := remote.Create(ctx, key, headers, ttl)
if err != nil {
return errors.Wrap(err, "failed to create object")
}

// tar from parentDir, archiving only the named subdir so paths are ./subdir/...
tarCmd := exec.CommandContext(ctx, "tar", "-cpf", "-", "-C", parentDir, "--", subdir)
zstdCmd := exec.CommandContext(ctx, "zstd", "-c", "-T4")

tarStdout, err := tarCmd.StdoutPipe()
if err != nil {
return errors.Join(errors.Wrap(err, "failed to create tar stdout pipe"), wc.Close())
}

var tarStderr, zstdStderr bytes.Buffer
tarCmd.Stderr = &tarStderr

zstdCmd.Stdin = tarStdout
zstdCmd.Stdout = wc
zstdCmd.Stderr = &zstdStderr

if err := tarCmd.Start(); err != nil {
return errors.Join(errors.Wrap(err, "failed to start tar"), wc.Close())
}
if err := zstdCmd.Start(); err != nil {
return errors.Join(errors.Wrap(err, "failed to start zstd"), tarCmd.Wait(), wc.Close())
}

tarErr := tarCmd.Wait()
zstdErr := zstdCmd.Wait()
closeErr := wc.Close()

var errs []error
if tarErr != nil {
errs = append(errs, errors.Errorf("tar failed: %w: %s", tarErr, tarStderr.String()))
}
if zstdErr != nil {
errs = append(errs, errors.Errorf("zstd failed: %w: %s", zstdErr, zstdStderr.String()))
}
if closeErr != nil {
errs = append(errs, errors.Wrap(closeErr, "failed to close writer"))
}

return errors.Join(errs...)
}

// Restore downloads an archive from the cache and extracts it to a directory.
//
// The archive is decompressed with zstd and extracted with tar, preserving
Expand Down
12 changes: 12 additions & 0 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
SnapshotInterval time.Duration `hcl:"snapshot-interval,optional" help:"How often to generate tar.zstd workstation snapshots. 0 disables snapshots." default:"0"`
MirrorSnapshotInterval time.Duration `hcl:"mirror-snapshot-interval,optional" help:"How often to generate mirror snapshots for pod bootstrap. 0 uses snapshot-interval. Defaults to 6h." default:"6h"`
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"`
LFSSnapshotEnabled bool `hcl:"lfs-snapshot-enabled,optional" help:"When true, also generate a separate LFS object snapshot at /git/{repo}/lfs-snapshot.tar.zst on each snapshot interval. Requires git-lfs and a configured GitHub App." default:"false"`
// ServerURL is embedded as remote.origin.url in snapshots so git pull goes through cachew.
ServerURL string `hcl:"server-url,optional" help:"Base URL of this cachew instance, embedded in snapshot remote URLs." default:"${CACHEW_URL}"`
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression (0 = all CPU cores)." default:"0"`
Expand Down Expand Up @@ -151,6 +152,9 @@ func New(

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
if s.config.LFSSnapshotEnabled {
s.scheduleLFSSnapshotJobs(repo)
}
}
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
Expand Down Expand Up @@ -219,6 +223,11 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
return
}

if strings.HasSuffix(pathValue, "/lfs-snapshot.tar.zst") {
s.handleLFSSnapshotRequest(w, r, host, pathValue)
return
}

service := r.URL.Query().Get("service")
isReceivePack := service == "git-receive-pack" || strings.HasSuffix(pathValue, "/git-receive-pack")

Expand Down Expand Up @@ -505,6 +514,9 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
if s.config.LFSSnapshotEnabled {
s.scheduleLFSSnapshotJobs(repo)
}
}
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
Expand Down
170 changes: 170 additions & 0 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -413,3 +414,172 @@ func snapshotSpoolDirForURL(mirrorRoot, upstreamURL string) (string, error) {
}
return filepath.Join(mirrorRoot, ".snapshot-spools", repoPath), nil
}

// generateAndUploadLFSSnapshot fetches the LFS objects for the repository's default
// branch and archives them as a separate tar.zst served at /git/{repo}/lfs-snapshot.tar.zst.
//
// The archive stores paths relative to .git/ (e.g. ./lfs/objects/xx/yy/sha256) so that
// the client can extract it directly into the repo's .git/ directory.
//
// This is called on the same schedule as generateAndUploadSnapshot so the LFS archive
// stays current with the mirror. It requires the GitHub App token manager to be
// configured so that git-lfs can authenticate with GitHub when fetching objects not
// already present in the mirror's .git/lfs/ store.
func (s *Strategy) generateAndUploadLFSSnapshot(ctx context.Context, repo *gitclone.Repository) error {
logger := logging.FromContext(ctx)
upstream := repo.UpstreamURL()

// Verify git-lfs is available before doing any work.
if _, err := exec.LookPath("git-lfs"); err != nil {
logger.WarnContext(ctx, "git-lfs not found, skipping LFS snapshot",
slog.String("upstream", upstream))
return nil
}

logger.InfoContext(ctx, "LFS snapshot generation started", slog.String("upstream", upstream))

mirrorRoot := s.cloneManager.Config().MirrorRoot
snapshotDir, err := snapshotDirForURL(mirrorRoot, upstream)
if err != nil {
return err
}
lfsWorkDir := snapshotDir + "-lfs"

if err := os.RemoveAll(lfsWorkDir); err != nil {
return errors.Wrap(err, "remove previous LFS work dir")
}
if err := os.MkdirAll(filepath.Dir(lfsWorkDir), 0o750); err != nil {
return errors.Wrap(err, "create LFS work dir parent")
}

// Clone the mirror into a working directory under a read lock.
if err := repo.WithReadLock(func() error {
// #nosec G204
cmd := exec.CommandContext(ctx, "git", "clone", repo.Path(), lfsWorkDir)
if output, err := cmd.CombinedOutput(); err != nil {
return errors.Wrapf(err, "git clone for LFS snapshot: %s", string(output))
}
return nil
}); err != nil {
_ = os.RemoveAll(lfsWorkDir)
return errors.WithStack(err)
}

defer func() {
if rmErr := os.RemoveAll(lfsWorkDir); rmErr != nil {
logger.WarnContext(ctx, "Failed to clean up LFS work dir",
slog.String("error", rmErr.Error()))
}
}()

// Restore the upstream URL for LFS fetch. git-lfs contacts the LFS server
// at {remote.origin.url}/info/lfs; we must point it at GitHub, not cachew.
// #nosec G204
if output, err := exec.CommandContext(ctx, "git", "-C", lfsWorkDir,
"remote", "set-url", "origin", upstream).CombinedOutput(); err != nil {
return errors.Wrapf(err, "restore LFS origin URL: %s", string(output))
}

// Inject a GitHub App token so git-lfs can authenticate when downloading
// LFS objects that are not already in the local object store.
fetchEnv := os.Environ()
if s.tokenManager != nil {
// Extract org from upstream URL, e.g. "block" from "https://github.com/block/repo".
org := orgFromUpstreamURL(upstream)
if org != "" {
token, tokenErr := s.tokenManager.GetTokenForOrg(ctx, org)
if tokenErr != nil {
logger.WarnContext(ctx, "Failed to get GitHub token for LFS fetch — proceeding unauthenticated",
slog.String("org", org), slog.String("error", tokenErr.Error()))
} else if token != "" {
// Embed credentials in the remote URL for this working directory only.
authedURL := fmt.Sprintf("https://x-access-token:%s@github.com/%s",
token, strings.TrimPrefix(upstream, "https://github.com/"))
// #nosec G204
if output, err := exec.CommandContext(ctx, "git", "-C", lfsWorkDir,
"remote", "set-url", "origin", authedURL).CombinedOutput(); err != nil {
return errors.Wrapf(err, "set authed LFS origin URL: %s", string(output))
}
}
}
}

// Fetch all LFS objects for HEAD (the default branch).
// #nosec G204
fetchCmd := exec.CommandContext(ctx, "git", "-C", lfsWorkDir, "lfs", "fetch", "origin", "HEAD")
fetchCmd.Env = fetchEnv
if output, err := fetchCmd.CombinedOutput(); err != nil {
return errors.Wrapf(err, "git lfs fetch: %s", string(output))
}

// Check whether any LFS objects were actually downloaded.
lfsDir := filepath.Join(lfsWorkDir, ".git", "lfs")
if _, err := os.Stat(lfsDir); os.IsNotExist(err) {
logger.InfoContext(ctx, "No LFS objects in repository, skipping LFS snapshot",
slog.String("upstream", upstream))
return nil
}

// Archive the ./lfs/ subtree relative to .git/ so the client can extract
// directly into .git/ and get .git/lfs/objects/xx/yy/sha256.
cacheKey := cache.NewKey(upstream + ".lfs-snapshot")
ttl := 7 * 24 * time.Hour
gitDir := filepath.Join(lfsWorkDir, ".git")

if err := snapshot.CreateSubdir(ctx, s.cache, cacheKey, gitDir, "lfs", ttl); err != nil {
return errors.Wrap(err, "create LFS snapshot")
}

logger.InfoContext(ctx, "LFS snapshot generation completed", slog.String("upstream", upstream))
return nil
}

// orgFromUpstreamURL extracts the GitHub organisation name from an upstream URL.
// Returns "" for non-github.com URLs or malformed paths.
func orgFromUpstreamURL(upstream string) string {
trimmed := strings.TrimPrefix(upstream, "https://github.com/")
if trimmed == upstream {
return "" // not a github.com URL
}
parts := strings.SplitN(trimmed, "/", 2)
if len(parts) == 0 {
return ""
}
return parts[0]
}

func (s *Strategy) scheduleLFSSnapshotJobs(repo *gitclone.Repository) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "lfs-snapshot-periodic", s.config.SnapshotInterval, func(ctx context.Context) error {
return s.generateAndUploadLFSSnapshot(ctx, repo)
})
}

func (s *Strategy) handleLFSSnapshotRequest(w http.ResponseWriter, r *http.Request, host, pathValue string) {
ctx := r.Context()
logger := logging.FromContext(ctx)

repoPath := ExtractRepoPath(strings.TrimSuffix(pathValue, "/lfs-snapshot.tar.zst"))
upstreamURL := "https://" + host + "/" + repoPath

cacheKey := cache.NewKey(upstreamURL + ".lfs-snapshot")
reader, headers, err := s.cache.Open(ctx, cacheKey)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
http.Error(w, "LFS snapshot not found", http.StatusNotFound)
return
}
logger.ErrorContext(ctx, "Failed to open LFS snapshot from cache", "upstream", upstreamURL, "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
defer reader.Close()

for key, values := range headers {
for _, value := range values {
w.Header().Add(key, value)
}
}
if _, err := io.Copy(w, reader); err != nil {
logger.ErrorContext(ctx, "Failed to stream LFS snapshot", "upstream", upstreamURL, "error", err)
}
}