From 9ff179d1255552d87107c17bb533ba8652dfb581 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 31 May 2026 08:42:09 +0000 Subject: [PATCH] feat(distributed): resumable file uploads via HTTP Content-Range MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Large model GGUFs (multi-GB) transferred between master and worker over flaky / bandwidth-throttled paths (e.g. libp2p relays with byte caps) used to restart from byte 0 on every transport error. This change adds standard HTTP Range/resume semantics to the worker's PUT /v1/files/ endpoint and teaches the master-side HTTPFileStager to consult the worker for the last accepted offset and resume from there. Server side (file_transfer_server.go): - PUT now honors Content-Range: bytes -/. The handler validates that matches the current on-disk size; mismatches return 416 with the actual size in X-File-Size. - Mid-upload chunks return 308 Permanent Redirect ("Resume Incomplete") with the new size, so the client can keep going. - An optional X-Content-SHA256 request header binds an upload to a target hash; cross-attempt drift returns 409. On the final chunk the server re-computes SHA-256 and returns 400 if it doesn't match. - HEAD now advertises Accept-Ranges: bytes and Content-Length, and exposes X-Target-SHA256 for in-progress files (so clients can resume only when the partial bytes belong to the file they want to upload). - Legacy PUTs with no Content-Range keep the original truncate-create semantics — zero behavior change on the happy path. Client side (file_stager_http.go): - Pre-PUT HEAD probe reads X-File-Size + X-Target-SHA256 to determine the resume offset. - doUpload seeks to that offset and sends Content-Range + X-Content-SHA256. - Retry loop switches from fixed 3 attempts / 5s-10s-20s backoff to an outer time budget with exponential backoff (1s -> 30s cap), so a 5GB upload over a flaky link can outlast many short disconnects. - 308 and 416 responses are treated as transient: the next iteration re-HEADs to learn the correct offset. Tests: - Two-chunk Content-Range round-trip produces the correct file + sidecar. - 416 on a Content-Range/file-size mismatch. - 409 on X-Content-SHA256 drift between chunks. - 400 on final-hash mismatch. - HEAD on a partial upload exposes X-Target-SHA256 (not a misleading hash-of-partial-bytes via X-Content-SHA256). - Pre-existing finished file with a different hash is transparently overwritten when a new PUT starts at byte 0. - End-to-end resume: EnsureRemote against a worker that already holds a partial file transfers only the remainder. - Mid-stream connection drop on attempt #1 is recovered by attempt #2 resuming from the partial offset. Assisted-by: Claude:claude-opus-4-7 Signed-off-by: Ettore Di Giacinto --- core/services/nodes/file_stager_http.go | 220 +++++++++++- core/services/nodes/file_transfer_server.go | 263 +++++++++++++- .../nodes/file_transfer_server_test.go | 328 ++++++++++++++++++ 3 files changed, 793 insertions(+), 18 deletions(-) diff --git a/core/services/nodes/file_stager_http.go b/core/services/nodes/file_stager_http.go index 6131f6f79aae..90faee17217c 100644 --- a/core/services/nodes/file_stager_http.go +++ b/core/services/nodes/file_stager_http.go @@ -101,23 +101,52 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke fileSize := fi.Size() url := fmt.Sprintf("http://%s/v1/files/%s", addr, key) + + // Compute the SHA-256 of the local file once and bind it to every PUT + // attempt — the server uses it to detect mid-flight content drift and + // reject (409) if a partial upload claims a new identity, forcing a clean + // restart. + localHash, err := downloader.CalculateSHA(localPath) + if err != nil { + // Hash failure isn't fatal — we can still upload; we just lose + // resume-safety and end-of-transfer integrity checks. + xlog.Warn("Failed to hash local file for upload integrity check", "localPath", localPath, "error", err) + localHash = "" + } + xlog.Info("Uploading file to remote node", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "url", url) + // Outer time budget: bound the total resumable-upload duration so a + // permanently-unreachable worker doesn't hold the request forever. Default + // matches the existing per-response timeout. + outerBudget := h.resumeBudget() + + resumeCtx, cancel := context.WithTimeout(ctx, outerBudget) + defer cancel() + var lastErr error - attempts := h.maxRetries + 1 // maxRetries=3 means 4 total attempts (1 initial + 3 retries) - for attempt := 1; attempt <= attempts; attempt++ { + attempt := 0 + for { + attempt++ if attempt > 1 { - backoff := time.Duration(5<<(attempt-2)) * time.Second // 5s, 10s, 20s + backoff := nextBackoff(attempt) xlog.Warn("Retrying file upload", "node", nodeID, "file", filepath.Base(localPath), - "attempt", attempt, "of", attempts, "backoff", backoff, "lastError", lastErr) + "attempt", attempt, "backoff", backoff, "lastError", lastErr) select { - case <-ctx.Done(): - return "", fmt.Errorf("upload cancelled during retry backoff: %w", ctx.Err()) + case <-resumeCtx.Done(): + return "", fmt.Errorf("upload cancelled during retry backoff (after %d attempts): %w (last: %v)", attempt-1, resumeCtx.Err(), lastErr) case <-time.After(backoff): } } - result, err := h.doUpload(ctx, addr, nodeID, localPath, key, url, fileSize) + // Determine resume offset from the server before each attempt. A + // HEAD response that reports an in-progress upload (X-Target-SHA256) + // matching ours unlocks resume from the reported size; any other + // outcome (missing file, hash mismatch, partial-of-different-file) + // resets to 0 and uploads the entire file. + startOffset := h.resumeOffset(resumeCtx, addr, key, localHash, fileSize) + + result, err := h.doUpload(ctx, resumeCtx, addr, nodeID, localPath, key, url, fileSize, startOffset, localHash) if err == nil { if attempt > 1 { xlog.Info("File upload succeeded after retry", "node", nodeID, "file", filepath.Base(localPath), "attempt", attempt) @@ -126,50 +155,190 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke } lastErr = err + // Non-transient failures (4xx other than 416, hard auth, etc.) abort + // immediately — retrying won't help. if !isTransientError(err) { xlog.Error("File upload failed with non-transient error", "node", nodeID, "file", filepath.Base(localPath), "error", err) return "", err } + + // Caller-cancelled (not deadline) — give up. + if errors.Is(ctx.Err(), context.Canceled) { + return "", fmt.Errorf("upload cancelled by caller after %d attempts: %w", attempt, lastErr) + } + + // Outer budget exhausted. + if errors.Is(resumeCtx.Err(), context.DeadlineExceeded) { + return "", fmt.Errorf("uploading %s to node %s failed after %d attempts within %s budget: %w", + localPath, nodeID, attempt, outerBudget, lastErr) + } + xlog.Warn("File upload failed with transient error", "node", nodeID, "file", filepath.Base(localPath), - "attempt", attempt, "of", attempts, "error", err) + "attempt", attempt, "error", err) } +} + +// resumeBudget returns the maximum total time the resumable upload loop will +// spend retrying transient failures end-to-end. Past this budget the upload +// fails rather than spinning forever — 1h covers multi-GB transfers on +// pathological links without letting a wedged server jam the master. +func (h *HTTPFileStager) resumeBudget() time.Duration { + return 1 * time.Hour +} - return "", fmt.Errorf("uploading %s to node %s failed after %d attempts: %w", localPath, nodeID, attempts, lastErr) +// nextBackoff returns the sleep before retry #attempt: 1s, 2s, 4s, ..., capped +// at 30s, with the first sleep (attempt=2) being 1s. +func nextBackoff(attempt int) time.Duration { + if attempt < 2 { + return 0 + } + const ( + base = 1 * time.Second + ceiling = 30 * time.Second + ) + shift := uint(attempt - 2) + if shift > 30 { + shift = 30 // saturate before time.Duration overflows + } + b := base << shift + if b > ceiling || b < 0 { + b = ceiling + } + return b } -// doUpload performs a single upload attempt. -func (h *HTTPFileStager) doUpload(ctx context.Context, addr, nodeID, localPath, key, url string, fileSize int64) (string, error) { +// resumeOffset asks the server (via HEAD) how many bytes of the current upload +// are already on disk. It returns 0 if the server has no usable partial state +// (no file, finished file with a different hash, or a partial under a +// different target hash). It returns the server-reported size when the +// server's X-Target-SHA256 matches our expected final hash AND the size is +// strictly less than the local file size. +func (h *HTTPFileStager) resumeOffset(ctx context.Context, addr, key, localHash string, fileSize int64) int64 { + if localHash == "" || fileSize <= 0 { + return 0 + } + url := fmt.Sprintf("http://%s/v1/files/%s", addr, key) + req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) + if err != nil { + return 0 + } + if h.token != "" { + req.Header.Set("Authorization", "Bearer "+h.token) + } + resp, err := h.client.Do(req) + if err != nil { + return 0 + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + return 0 + } + + sizeStr := resp.Header.Get(HeaderFileSize) + if sizeStr == "" { + return 0 + } + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil || size <= 0 || size >= fileSize { + return 0 + } + + target := resp.Header.Get(HeaderTargetSHA256) + if target == "" || !strings.EqualFold(target, localHash) { + // No partial-upload metadata, or it's for a different target. + return 0 + } + + xlog.Info("Resuming upload from server-reported offset", "key", key, "offset", size, "total", fileSize) + return size +} + +// doUpload performs a single upload attempt. When startOffset > 0 the request +// is sent as a resumable PUT with a Content-Range header, transferring only +// the bytes from startOffset to fileSize-1. The outerCtx is the long-lived +// resume budget; reqCtx is what's bound to the request (currently the same as +// the parent ctx, since http.Client doesn't expose a per-request timeout). +func (h *HTTPFileStager) doUpload(ctx, outerCtx context.Context, addr, nodeID, localPath, key, url string, fileSize, startOffset int64, expectedHash string) (string, error) { + if startOffset < 0 || startOffset > fileSize { + startOffset = 0 + } + f, err := os.Open(localPath) if err != nil { return "", fmt.Errorf("opening local file %s: %w", localPath, err) } defer f.Close() + if startOffset > 0 { + if _, err := f.Seek(startOffset, io.SeekStart); err != nil { + return "", fmt.Errorf("seeking to offset %d in %s: %w", startOffset, localPath, err) + } + } + + chunkLen := fileSize - startOffset + var body io.Reader = f cb := StagingProgressFromContext(ctx) - // For files > 100MB or when a progress callback is set, wrap with progress reporting + // For files > 100MB or when a progress callback is set, wrap with progress reporting. + // We report against the FULL fileSize (not the chunkLen) so a resumed upload's + // progress bar starts from the actual completed fraction rather than at 0%. const progressThreshold = 100 << 20 if fileSize > progressThreshold || cb != nil { - body = newProgressReader(f, fileSize, filepath.Base(localPath), nodeID, cb) + pr := newProgressReader(f, fileSize, filepath.Base(localPath), nodeID, cb) + pr.read = startOffset // seed prior progress + body = pr } - req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body) + // The body length we actually send. + limitedBody := io.LimitReader(body, chunkLen) + + req, err := http.NewRequestWithContext(outerCtx, http.MethodPut, url, limitedBody) if err != nil { return "", fmt.Errorf("creating request: %w", err) } - req.ContentLength = fileSize // explicit Content-Length for progress tracking + req.ContentLength = chunkLen req.Header.Set("Content-Type", "application/octet-stream") if h.token != "" { req.Header.Set("Authorization", "Bearer "+h.token) } + if expectedHash != "" { + // Lets the server detect cross-attempt content drift and reject + // resume with 409 if the local file changed identity. + req.Header.Set(HeaderContentSHA256, expectedHash) + } + if startOffset > 0 || (expectedHash != "" && fileSize > 0) { + // Send Content-Range even on the first chunk (0-...) when we have an + // expected hash, so the server's range-aware branch records the + // target-hash sidecar for future resume attempts. + req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", startOffset, fileSize-1, fileSize)) + } resp, err := h.client.Do(req) if err != nil { - xlog.Error("File upload failed", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "error", err) + xlog.Error("File upload failed", "node", nodeID, "file", filepath.Base(localPath), + "size", humanFileSize(fileSize), "offset", startOffset, "error", err) return "", fmt.Errorf("uploading %s to node %s: %w", localPath, nodeID, err) } defer resp.Body.Close() + // 308 Permanent Redirect ("Resume Incomplete") means the chunk landed but + // the upload as a whole hasn't completed. From our perspective the + // connection survived and the server has more bytes than before — but + // since we always send the whole remainder, hitting 308 means the server + // truncated us. Treat as transient so the retry loop re-HEADs and tries + // again from the new offset. + if resp.StatusCode == http.StatusPermanentRedirect { + body, _ := io.ReadAll(resp.Body) + return "", &transientStatusError{status: resp.StatusCode, msg: fmt.Sprintf("server reports resume-incomplete: %s", string(body))} + } + + // 416 Range Not Satisfiable: client/server disagree on offset. Treat as + // transient — the next iteration re-HEADs to learn the correct offset. + if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { + body, _ := io.ReadAll(resp.Body) + return "", &transientStatusError{status: resp.StatusCode, msg: fmt.Sprintf("range not satisfiable: %s", string(body))} + } + if resp.StatusCode != http.StatusOK { respBody, _ := io.ReadAll(resp.Body) xlog.Error("File upload rejected by remote node", "node", nodeID, "file", filepath.Base(localPath), "status", resp.StatusCode, "response", string(respBody)) @@ -187,11 +356,30 @@ func (h *HTTPFileStager) doUpload(ctx context.Context, addr, nodeID, localPath, return result.LocalPath, nil } +// transientStatusError wraps an HTTP status that should be treated as +// transient by the upload retry loop. +type transientStatusError struct { + status int + msg string +} + +func (e *transientStatusError) Error() string { + return fmt.Sprintf("HTTP %d: %s", e.status, e.msg) +} + +func (e *transientStatusError) Transient() bool { return true } + // isTransientError returns true if the error is likely transient and worth retrying. func isTransientError(err error) bool { if err == nil { return false } + // Errors that explicitly opt into transient semantics (e.g. 308/416 from + // the resumable-upload protocol). + var transient interface{ Transient() bool } + if errors.As(err, &transient) && transient.Transient() { + return true + } // Connection reset by peer if errors.Is(err, syscall.ECONNRESET) { return true diff --git a/core/services/nodes/file_transfer_server.go b/core/services/nodes/file_transfer_server.go index 1607b85a7ff0..6b7a0343ccf4 100644 --- a/core/services/nodes/file_transfer_server.go +++ b/core/services/nodes/file_transfer_server.go @@ -30,7 +30,16 @@ const ( HeaderContentSHA256 = "X-Content-SHA256" HeaderLocalPath = "X-Local-Path" HeaderFileSize = "X-File-Size" - hashSidecarSuffix = ".sha256" + // HeaderTargetSHA256 is set on HEAD responses for partial (resumable) uploads + // to expose the expected final SHA-256 of the in-progress file. When set, + // the file on disk is not yet the full content — the client may resume by + // PUT'ing the remainder with a matching X-Content-SHA256 header. + HeaderTargetSHA256 = "X-Target-SHA256" + hashSidecarSuffix = ".sha256" + // targetSidecarSuffix stores the expected final SHA-256 of a partially + // uploaded file. Used to detect mid-flight content mismatches across + // resumed PUT requests. + targetSidecarSuffix = ".sha256.target" ) // StartFileTransferServer starts a small HTTP server for file transfer in distributed mode. @@ -169,7 +178,25 @@ func handleHead(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, d } w.Header().Set(HeaderFileSize, strconv.FormatInt(info.Size(), 10)) + w.Header().Set("Content-Length", strconv.FormatInt(info.Size(), 10)) w.Header().Set(HeaderLocalPath, filePath) + // Advertise resumable-upload support so clients know they may send + // Content-Range PUTs to continue partial transfers. + w.Header().Set("Accept-Ranges", "bytes") + + // If a target-hash sidecar is present the file on disk is a partial + // upload, not a finished file. Expose the expected final hash via + // X-Target-SHA256 and skip emitting X-Content-SHA256 (which would otherwise + // be the hash of just the bytes received so far — misleading for clients + // trying to decide whether the file is "the right one"). + if target, err := os.ReadFile(filePath + targetSidecarSuffix); err == nil { + t := strings.TrimSpace(string(target)) + if len(t) == 64 { + w.Header().Set(HeaderTargetSHA256, t) + w.WriteHeader(http.StatusOK) + return + } + } hashHex, err := computeAndCacheHash(filePath) if err != nil { @@ -181,6 +208,55 @@ func handleHead(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, d w.WriteHeader(http.StatusOK) } +// contentRange describes a parsed Content-Range request header of the form +// "bytes -/". An end of -1 means the request is open-ended +// (unknown end), which is unusual for uploads but accepted. +type contentRange struct { + start int64 + end int64 + total int64 +} + +// parseContentRange parses a Content-Range header value of the form +// "bytes -/". RFC 9110 §14.4. +// Returns (nil, nil) when the header is empty (no range request). +func parseContentRange(h string) (*contentRange, error) { + h = strings.TrimSpace(h) + if h == "" { + return nil, nil + } + const prefix = "bytes " + if !strings.HasPrefix(h, prefix) { + return nil, fmt.Errorf("invalid Content-Range: missing %q prefix", strings.TrimSpace(prefix)) + } + spec := strings.TrimSpace(h[len(prefix):]) + slash := strings.IndexByte(spec, '/') + if slash < 0 { + return nil, fmt.Errorf("invalid Content-Range: missing /total") + } + rangePart, totalPart := spec[:slash], spec[slash+1:] + dash := strings.IndexByte(rangePart, '-') + if dash < 0 { + return nil, fmt.Errorf("invalid Content-Range: missing - separator") + } + start, err := strconv.ParseInt(strings.TrimSpace(rangePart[:dash]), 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Content-Range start: %w", err) + } + end, err := strconv.ParseInt(strings.TrimSpace(rangePart[dash+1:]), 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Content-Range end: %w", err) + } + total, err := strconv.ParseInt(strings.TrimSpace(totalPart), 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Content-Range total: %w", err) + } + if start < 0 || end < start || total < end+1 { + return nil, fmt.Errorf("invalid Content-Range range: %d-%d/%d", start, end, total) + } + return &contentRange{start: start, end: end, total: total}, nil +} + func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, dataDir, key string, maxUploadSize int64) { if key == "" { http.Error(w, "key is required", http.StatusBadRequest) @@ -191,7 +267,19 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, r.Body = http.MaxBytesReader(w, r.Body, maxUploadSize) } - xlog.Info("Receiving file upload", "key", key, "contentLength", r.ContentLength, "remote", r.RemoteAddr) + // Parse optional Content-Range for resumable uploads. + cr, err := parseContentRange(r.Header.Get("Content-Range")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Optional expected total-file SHA-256 used to detect cross-attempt + // content drift on resume. + expectedFinalHash := strings.TrimSpace(r.Header.Get(HeaderContentSHA256)) + + xlog.Info("Receiving file upload", "key", key, "contentLength", r.ContentLength, + "contentRange", r.Header.Get("Content-Range"), "remote", r.RemoteAddr) // Route keyed files to the appropriate directory targetDir, relName := resolveKeyToDir(key, stagingDir, modelsDir, dataDir) @@ -208,6 +296,21 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, return } + if cr == nil { + // Non-resumable (legacy) path: truncate-create, single fire-and-forget. + handleFullUpload(w, r, dstPath, key, expectedFinalHash) + return + } + + handleRangeUpload(w, r, dstPath, key, cr, expectedFinalHash) +} + +// handleFullUpload writes the entire request body to dstPath, replacing any +// existing content. This is the legacy happy-path with no Range header. +func handleFullUpload(w http.ResponseWriter, r *http.Request, dstPath, key, expectedFinalHash string) { + // Reset any in-progress resumable state. + _ = os.Remove(dstPath + targetSidecarSuffix) + f, err := os.Create(dstPath) if err != nil { http.Error(w, fmt.Sprintf("creating file: %v", err), http.StatusInternalServerError) @@ -226,6 +329,14 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, } hashHex := hex.EncodeToString(hasher.Sum(nil)) + if expectedFinalHash != "" && !strings.EqualFold(expectedFinalHash, hashHex) { + _ = os.Remove(dstPath) + _ = os.Remove(dstPath + hashSidecarSuffix) + xlog.Error("Uploaded file SHA-256 mismatch", "key", key, "expected", expectedFinalHash, "got", hashHex) + http.Error(w, fmt.Sprintf("sha256 mismatch: expected %s got %s", expectedFinalHash, hashHex), http.StatusBadRequest) + return + } + if err := os.WriteFile(dstPath+hashSidecarSuffix, []byte(hashHex), 0640); err != nil { xlog.Warn("Failed to write hash sidecar", "path", dstPath+hashSidecarSuffix, "error", err) } @@ -238,6 +349,154 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, } } +// handleRangeUpload appends a Content-Range slice to dstPath, validating that +// the request starts at the current file size. When the slice completes the +// transfer (end+1 == total), it validates the optional expected final hash and +// writes the sidecar. +func handleRangeUpload(w http.ResponseWriter, r *http.Request, dstPath, key string, cr *contentRange, expectedFinalHash string) { + // Determine the current on-disk size (0 if missing). + var currentSize int64 + if info, err := os.Stat(dstPath); err == nil { + if info.IsDir() { + http.Error(w, "destination is a directory", http.StatusBadRequest) + return + } + currentSize = info.Size() + } else if !os.IsNotExist(err) { + http.Error(w, fmt.Sprintf("stat dst: %v", err), http.StatusInternalServerError) + return + } + + targetSidecar := dstPath + targetSidecarSuffix + + // Decide whether the existing on-disk bytes (if any) belong to the same + // logical file the client is uploading now. If they don't, and the client + // is starting from byte 0, we transparently truncate the old file and + // proceed — this is the natural "re-upload" case. + if cr.start == 0 && currentSize > 0 { + sameFile := false + if expectedFinalHash != "" { + // Compare the client's declared target hash against either an + // in-progress target sidecar OR the completed-file sidecar. + if t, err := os.ReadFile(targetSidecar); err == nil { + if strings.EqualFold(strings.TrimSpace(string(t)), expectedFinalHash) { + sameFile = true + } + } else if h, err := os.ReadFile(dstPath + hashSidecarSuffix); err == nil { + if strings.EqualFold(strings.TrimSpace(string(h)), expectedFinalHash) { + sameFile = true + } + } + } + if !sameFile { + // Different file content claimed under the same key — drop any + // existing bytes (completed or partial) so the new upload starts + // from a clean slate. + _ = os.Remove(dstPath) + _ = os.Remove(dstPath + hashSidecarSuffix) + _ = os.Remove(targetSidecar) + currentSize = 0 + } + } + + // Cross-attempt consistency: if there's an in-progress target sidecar with + // a different hash than what's now being claimed, force a restart. + if expectedFinalHash != "" && cr.start > 0 { + prev, _ := os.ReadFile(targetSidecar) + prevHash := strings.TrimSpace(string(prev)) + if prevHash != "" && !strings.EqualFold(prevHash, expectedFinalHash) { + _ = os.Remove(dstPath) + _ = os.Remove(dstPath + hashSidecarSuffix) + _ = os.Remove(targetSidecar) + http.Error(w, fmt.Sprintf("X-Content-SHA256 mismatch with in-progress upload (was %s, now %s); restart from byte 0", prevHash, expectedFinalHash), http.StatusConflict) + return + } + } + + // The most important invariant: the client must continue from exactly + // where the server left off. If not, return 416 with the current size in + // the Range header so the client can re-sync. + if cr.start != currentSize { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", cr.total)) + w.Header().Set(HeaderFileSize, strconv.FormatInt(currentSize, 10)) + http.Error(w, fmt.Sprintf("Content-Range start %d does not match current file size %d", cr.start, currentSize), http.StatusRequestedRangeNotSatisfiable) + return + } + + // Open the file in append mode (create if missing). + f, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0640) + if err != nil { + http.Error(w, fmt.Sprintf("opening dst: %v", err), http.StatusInternalServerError) + return + } + defer func() { _ = f.Close() }() + + // Persist the declared expected hash so subsequent chunks can be + // cross-checked. + if expectedFinalHash != "" { + if err := os.WriteFile(targetSidecar, []byte(expectedFinalHash), 0640); err != nil { + xlog.Warn("Failed to write target hash sidecar", "path", targetSidecar, "error", err) + } + } + + expectedChunkLen := cr.end - cr.start + 1 + limited := io.LimitReader(r.Body, expectedChunkLen) + n, err := io.Copy(f, limited) + if err != nil { + xlog.Error("Range upload chunk failed", "key", key, "bytesReceived", n, "expected", expectedChunkLen, "remote", r.RemoteAddr, "error", err) + http.Error(w, fmt.Sprintf("writing file: %v", err), http.StatusInternalServerError) + return + } + if n != expectedChunkLen { + xlog.Error("Range upload chunk short", "key", key, "bytesReceived", n, "expected", expectedChunkLen, "remote", r.RemoteAddr) + http.Error(w, fmt.Sprintf("short body: got %d expected %d", n, expectedChunkLen), http.StatusBadRequest) + return + } + + newSize := currentSize + n + + // If this chunk does not complete the transfer, return 308 Resume + // Incomplete (semantically aligns with the GCS/Tus resumable convention, + // which most language ecosystems treat as "keep going") and report the + // current size so the client can continue. + if newSize < cr.total { + w.Header().Set("Range", fmt.Sprintf("bytes=0-%d", newSize-1)) + w.Header().Set(HeaderFileSize, strconv.FormatInt(newSize, 10)) + w.WriteHeader(http.StatusPermanentRedirect) // 308 — "Resume Incomplete" + xlog.Debug("Range upload chunk accepted", "key", key, "newSize", newSize, "total", cr.total) + return + } + + // Upload complete — compute the final hash by re-reading the file. + finalHash, err := downloader.CalculateSHA(dstPath) + if err != nil { + xlog.Error("Failed to compute final hash on range upload", "path", dstPath, "error", err) + http.Error(w, fmt.Sprintf("computing final hash: %v", err), http.StatusInternalServerError) + return + } + if expectedFinalHash != "" && !strings.EqualFold(expectedFinalHash, finalHash) { + _ = os.Remove(dstPath) + _ = os.Remove(dstPath + hashSidecarSuffix) + _ = os.Remove(targetSidecar) + xlog.Error("Resumed upload SHA-256 mismatch", "key", key, "expected", expectedFinalHash, "got", finalHash) + http.Error(w, fmt.Sprintf("sha256 mismatch: expected %s got %s", expectedFinalHash, finalHash), http.StatusBadRequest) + return + } + + if err := os.WriteFile(dstPath+hashSidecarSuffix, []byte(finalHash), 0640); err != nil { + xlog.Warn("Failed to write hash sidecar", "path", dstPath+hashSidecarSuffix, "error", err) + } + // Clear the in-progress sidecar — upload is committed. + _ = os.Remove(targetSidecar) + + xlog.Info("Resumable file upload complete", "key", key, "path", dstPath, "size", newSize, "sha256", finalHash) + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(map[string]string{"local_path": dstPath}); err != nil { + xlog.Warn("Failed to encode upload response", "error", err) + } +} + // computeAndCacheHash returns the SHA-256 hex digest for filePath. // It reads a cached sidecar when available and still fresh (sidecar mtime >= // file mtime), otherwise computes the hash and writes/updates the sidecar. diff --git a/core/services/nodes/file_transfer_server_test.go b/core/services/nodes/file_transfer_server_test.go index e5ed9ba2e759..c2c7a44a9135 100644 --- a/core/services/nodes/file_transfer_server_test.go +++ b/core/services/nodes/file_transfer_server_test.go @@ -5,12 +5,16 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "io" "net/http" "net/http/httptest" "os" "path/filepath" + "strconv" "strings" + "sync" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -559,6 +563,330 @@ var _ = Describe("FileTransferServer", func() { Expect(uploaded).To(Equal(content)) }) }) + + // --- Resumable upload (Content-Range) tests --- + + Describe("Resumable upload (Content-Range)", func() { + // doPut sends a PUT to ts with the given body, headers, and key. + doPut := func(ts *httptest.Server, token, key string, body []byte, headers map[string]string) (*http.Response, []byte) { + req, err := http.NewRequest(http.MethodPut, ts.URL+"/v1/files/"+key, bytes.NewReader(body)) + Expect(err).ToNot(HaveOccurred()) + req.Header.Set("Authorization", "Bearer "+token) + for k, v := range headers { + req.Header.Set(k, v) + } + resp, err := http.DefaultClient.Do(req) + Expect(err).ToNot(HaveOccurred()) + defer func() { _ = resp.Body.Close() }() + respBody, _ := io.ReadAll(resp.Body) + return resp, respBody + } + + It("accepts two consecutive Content-Range chunks and produces the full file", func() { + ts, stagingDir, _, _ := setupTestServer("tok", 0) + + full := bytes.Repeat([]byte("abcdefghij"), 20) // 200 bytes + fullHash := sha256Hex(full) + + // Chunk 1: bytes 0-99 + resp1, _ := doPut(ts, "tok", "chunked.bin", full[:100], map[string]string{ + "Content-Range": fmt.Sprintf("bytes 0-99/%d", len(full)), + HeaderContentSHA256: fullHash, + }) + Expect(resp1.StatusCode).To(Equal(http.StatusPermanentRedirect)) + Expect(resp1.Header.Get(HeaderFileSize)).To(Equal("100")) + + // Chunk 2: bytes 100-199 + resp2, _ := doPut(ts, "tok", "chunked.bin", full[100:], map[string]string{ + "Content-Range": fmt.Sprintf("bytes 100-199/%d", len(full)), + HeaderContentSHA256: fullHash, + }) + Expect(resp2.StatusCode).To(Equal(http.StatusOK)) + + // File matches the full content + got, err := os.ReadFile(filepath.Join(stagingDir, "chunked.bin")) + Expect(err).ToNot(HaveOccurred()) + Expect(got).To(Equal(full)) + + // Sidecar holds the final hash + sidecar, err := os.ReadFile(filepath.Join(stagingDir, "chunked.bin.sha256")) + Expect(err).ToNot(HaveOccurred()) + Expect(string(sidecar)).To(Equal(fullHash)) + + // Target sidecar (in-progress marker) is cleared once complete + _, err = os.Stat(filepath.Join(stagingDir, "chunked.bin.sha256.target")) + Expect(os.IsNotExist(err)).To(BeTrue()) + }) + + It("returns 416 when Content-Range start does not match current file size", func() { + ts, _, _, _ := setupTestServer("tok", 0) + + full := bytes.Repeat([]byte("x"), 200) + fullHash := sha256Hex(full) + + // First chunk: bytes 0-49 + resp1, _ := doPut(ts, "tok", "mismatch.bin", full[:50], map[string]string{ + "Content-Range": fmt.Sprintf("bytes 0-49/%d", len(full)), + HeaderContentSHA256: fullHash, + }) + Expect(resp1.StatusCode).To(Equal(http.StatusPermanentRedirect)) + + // Skip ahead: server has 50 bytes but client tries to send 100-199. + resp2, _ := doPut(ts, "tok", "mismatch.bin", full[100:200], map[string]string{ + "Content-Range": fmt.Sprintf("bytes 100-199/%d", len(full)), + HeaderContentSHA256: fullHash, + }) + Expect(resp2.StatusCode).To(Equal(http.StatusRequestedRangeNotSatisfiable)) + Expect(resp2.Header.Get(HeaderFileSize)).To(Equal("50")) + }) + + It("returns 409 when X-Content-SHA256 changes between resumed chunks", func() { + ts, _, _, _ := setupTestServer("tok", 0) + + a := bytes.Repeat([]byte("a"), 200) + b := bytes.Repeat([]byte("b"), 200) + + // Chunk 1 (file A): bytes 0-49 + resp1, _ := doPut(ts, "tok", "drifted.bin", a[:50], map[string]string{ + "Content-Range": fmt.Sprintf("bytes 0-49/%d", len(a)), + HeaderContentSHA256: sha256Hex(a), + }) + Expect(resp1.StatusCode).To(Equal(http.StatusPermanentRedirect)) + + // Chunk 2 claims file B's hash for the *same* key — should be rejected. + resp2, _ := doPut(ts, "tok", "drifted.bin", b[50:100], map[string]string{ + "Content-Range": fmt.Sprintf("bytes 50-99/%d", len(b)), + HeaderContentSHA256: sha256Hex(b), + }) + Expect(resp2.StatusCode).To(Equal(http.StatusConflict)) + }) + + It("returns 400 when final SHA-256 does not match the declared target", func() { + ts, _, _, _ := setupTestServer("tok", 0) + + full := bytes.Repeat([]byte("z"), 100) + wrongHash := sha256Hex([]byte("definitely-not-this")) + + resp, _ := doPut(ts, "tok", "bad-hash.bin", full, map[string]string{ + "Content-Range": fmt.Sprintf("bytes 0-99/%d", len(full)), + HeaderContentSHA256: wrongHash, + }) + Expect(resp.StatusCode).To(Equal(http.StatusBadRequest)) + }) + + It("HEAD on a partial upload exposes X-Target-SHA256 and current size", func() { + ts, _, _, _ := setupTestServer("tok", 0) + + full := bytes.Repeat([]byte("q"), 200) + fullHash := sha256Hex(full) + + // One chunk uploaded, file is partial. + resp1, _ := doPut(ts, "tok", "partial.bin", full[:60], map[string]string{ + "Content-Range": fmt.Sprintf("bytes 0-59/%d", len(full)), + HeaderContentSHA256: fullHash, + }) + Expect(resp1.StatusCode).To(Equal(http.StatusPermanentRedirect)) + + req, err := http.NewRequest(http.MethodHead, ts.URL+"/v1/files/partial.bin", nil) + Expect(err).ToNot(HaveOccurred()) + req.Header.Set("Authorization", "Bearer tok") + resp, err := http.DefaultClient.Do(req) + Expect(err).ToNot(HaveOccurred()) + _ = resp.Body.Close() + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(resp.Header.Get(HeaderFileSize)).To(Equal("60")) + Expect(resp.Header.Get("Accept-Ranges")).To(Equal("bytes")) + Expect(resp.Header.Get(HeaderTargetSHA256)).To(Equal(fullHash)) + // While the upload is in progress we must NOT expose a misleading + // X-Content-SHA256 of the bytes-so-far — clients use HeaderContentSHA256 + // only for completed files. + Expect(resp.Header.Get(HeaderContentSHA256)).To(BeEmpty()) + }) + + It("transparently overwrites an existing finished file when client starts from byte 0 with a new hash", func() { + ts, stagingDir, _, _ := setupTestServer("tok", 0) + + // Pre-place a finished file (sidecar present, no target sidecar). + oldContent := []byte("ancient version") + err := os.WriteFile(filepath.Join(stagingDir, "overwrite.bin"), oldContent, 0644) + Expect(err).ToNot(HaveOccurred()) + err = os.WriteFile(filepath.Join(stagingDir, "overwrite.bin.sha256"), []byte(sha256Hex(oldContent)), 0644) + Expect(err).ToNot(HaveOccurred()) + + // New upload with a different target hash, range 0-N/total. + newContent := bytes.Repeat([]byte("new"), 50) // 150 bytes + newHash := sha256Hex(newContent) + + resp, _ := doPut(ts, "tok", "overwrite.bin", newContent, map[string]string{ + "Content-Range": fmt.Sprintf("bytes 0-%d/%d", len(newContent)-1, len(newContent)), + HeaderContentSHA256: newHash, + }) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + got, err := os.ReadFile(filepath.Join(stagingDir, "overwrite.bin")) + Expect(err).ToNot(HaveOccurred()) + Expect(got).To(Equal(newContent)) + }) + + It("HEAD advertises Accept-Ranges: bytes on completed files", func() { + ts, _, _, _ := setupTestServer("tok", 0) + + content := "done" + doPut(ts, "tok", "ranges-advert.txt", []byte(content), nil) + + req, err := http.NewRequest(http.MethodHead, ts.URL+"/v1/files/ranges-advert.txt", nil) + Expect(err).ToNot(HaveOccurred()) + req.Header.Set("Authorization", "Bearer tok") + resp, err := http.DefaultClient.Do(req) + Expect(err).ToNot(HaveOccurred()) + _ = resp.Body.Close() + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(resp.Header.Get("Accept-Ranges")).To(Equal("bytes")) + Expect(resp.Header.Get("Content-Length")).To(Equal(strconv.Itoa(len(content)))) + }) + }) + + // --- End-to-end client/server resume tests --- + + Describe("HTTPFileStager resume via EnsureRemote", func() { + It("resumes from server's reported offset when a partial upload exists", func() { + ts, stagingDir, _, _ := setupTestServer("tok", 0) + + // Create the local file (the master's source-of-truth). + localDir := GinkgoT().TempDir() + localPath := filepath.Join(localDir, "resume.bin") + content := bytes.Repeat([]byte("R"), 500) + Expect(os.WriteFile(localPath, content, 0644)).To(Succeed()) + fullHash := sha256Hex(content) + + // Pre-seed the "worker" with the first 200 bytes as if a prior + // attempt had transferred that much, plus a target-hash sidecar + // claiming the full file's hash. + dst := filepath.Join(stagingDir, "resume.bin") + Expect(os.WriteFile(dst, content[:200], 0644)).To(Succeed()) + Expect(os.WriteFile(dst+".sha256.target", []byte(fullHash), 0644)).To(Succeed()) + + addr := strings.TrimPrefix(ts.URL, "http://") + stager := NewHTTPFileStager(func(nodeID string) (string, error) { + return addr, nil + }, "tok") + + remotePath, err := stager.EnsureRemote(context.Background(), "node-1", localPath, "resume.bin") + Expect(err).ToNot(HaveOccurred()) + Expect(remotePath).To(Equal(dst)) + + got, err := os.ReadFile(dst) + Expect(err).ToNot(HaveOccurred()) + Expect(got).To(Equal(content)) + + // Final sidecar should hold the full-file hash. + sidecar, err := os.ReadFile(dst + ".sha256") + Expect(err).ToNot(HaveOccurred()) + Expect(strings.TrimSpace(string(sidecar))).To(Equal(fullHash)) + }) + + It("survives a mid-stream connection drop and resumes on retry", func() { + // Server that drops the connection after writing the first N bytes + // on the FIRST PUT attempt, then behaves normally. + stagingDir := GinkgoT().TempDir() + modelsDir := GinkgoT().TempDir() + dataDir := GinkgoT().TempDir() + + var ( + attemptCount int + attemptMu sync.Mutex + ) + const dropAfter = 80 // bytes the server "accepts" before crashing + + mux := http.NewServeMux() + mux.HandleFunc("/v1/files/", func(w http.ResponseWriter, r *http.Request) { + if !checkBearerToken(r, "tok") { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + key := strings.TrimPrefix(r.URL.Path, "/v1/files/") + if r.Method == http.MethodHead { + handleHead(w, r, stagingDir, modelsDir, dataDir, key) + return + } + if r.Method != http.MethodPut { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + attemptMu.Lock() + attemptCount++ + thisAttempt := attemptCount + attemptMu.Unlock() + + if thisAttempt == 1 { + // Read a bounded prefix into the partial file, then hijack + // the connection and close abruptly to simulate the drop. + cr, err := parseContentRange(r.Header.Get("Content-Range")) + if err != nil || cr == nil { + http.Error(w, "expected content-range", http.StatusBadRequest) + return + } + dst := filepath.Join(stagingDir, key) + f, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + target := r.Header.Get(HeaderContentSHA256) + if target != "" { + _ = os.WriteFile(dst+".sha256.target", []byte(target), 0640) + } + _, _ = io.CopyN(f, r.Body, dropAfter) + _ = f.Close() + + hj, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "hijack unsupported", http.StatusInternalServerError) + return + } + conn, _, err := hj.Hijack() + if err == nil { + _ = conn.Close() // abrupt close — client sees a transport error + } + return + } + + // Subsequent attempts: behave normally. + handleUpload(w, r, stagingDir, modelsDir, dataDir, key, 0) + }) + ts := httptest.NewServer(mux) + DeferCleanup(ts.Close) + + // Build a small "model" file to upload (300 bytes for speed). + localDir := GinkgoT().TempDir() + localPath := filepath.Join(localDir, "flaky.bin") + content := bytes.Repeat([]byte("F"), 300) + Expect(os.WriteFile(localPath, content, 0644)).To(Succeed()) + + addr := strings.TrimPrefix(ts.URL, "http://") + stager := NewHTTPFileStager(func(nodeID string) (string, error) { + return addr, nil + }, "tok") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + remotePath, err := stager.EnsureRemote(ctx, "node-1", localPath, "flaky.bin") + Expect(err).ToNot(HaveOccurred()) + Expect(remotePath).To(Equal(filepath.Join(stagingDir, "flaky.bin"))) + + // Final file is correct + got, err := os.ReadFile(filepath.Join(stagingDir, "flaky.bin")) + Expect(err).ToNot(HaveOccurred()) + Expect(got).To(Equal(content)) + + // At least one retry happened + attemptMu.Lock() + Expect(attemptCount).To(BeNumerically(">=", 2)) + attemptMu.Unlock() + }) + }) }) func sha256Hex(data []byte) string {