Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 204 additions & 16 deletions core/services/nodes/file_stager_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,52 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke
fileSize := fi.Size()

url := fmt.Sprintf("http://%s/v1/files/%s", addr, key)

// Compute the SHA-256 of the local file once and bind it to every PUT
// attempt — the server uses it to detect mid-flight content drift and
// reject (409) if a partial upload claims a new identity, forcing a clean
// restart.
localHash, err := downloader.CalculateSHA(localPath)
if err != nil {
// Hash failure isn't fatal — we can still upload; we just lose
// resume-safety and end-of-transfer integrity checks.
xlog.Warn("Failed to hash local file for upload integrity check", "localPath", localPath, "error", err)
localHash = ""
}

xlog.Info("Uploading file to remote node", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "url", url)

// Outer time budget: bound the total resumable-upload duration so a
// permanently-unreachable worker doesn't hold the request forever. Default
// matches the existing per-response timeout.
outerBudget := h.resumeBudget()

resumeCtx, cancel := context.WithTimeout(ctx, outerBudget)
defer cancel()

var lastErr error
attempts := h.maxRetries + 1 // maxRetries=3 means 4 total attempts (1 initial + 3 retries)
for attempt := 1; attempt <= attempts; attempt++ {
attempt := 0
for {
attempt++
if attempt > 1 {
backoff := time.Duration(5<<(attempt-2)) * time.Second // 5s, 10s, 20s
backoff := nextBackoff(attempt)
xlog.Warn("Retrying file upload", "node", nodeID, "file", filepath.Base(localPath),
"attempt", attempt, "of", attempts, "backoff", backoff, "lastError", lastErr)
"attempt", attempt, "backoff", backoff, "lastError", lastErr)
select {
case <-ctx.Done():
return "", fmt.Errorf("upload cancelled during retry backoff: %w", ctx.Err())
case <-resumeCtx.Done():
return "", fmt.Errorf("upload cancelled during retry backoff (after %d attempts): %w (last: %v)", attempt-1, resumeCtx.Err(), lastErr)
case <-time.After(backoff):
}
}

result, err := h.doUpload(ctx, addr, nodeID, localPath, key, url, fileSize)
// Determine resume offset from the server before each attempt. A
// HEAD response that reports an in-progress upload (X-Target-SHA256)
// matching ours unlocks resume from the reported size; any other
// outcome (missing file, hash mismatch, partial-of-different-file)
// resets to 0 and uploads the entire file.
startOffset := h.resumeOffset(resumeCtx, addr, key, localHash, fileSize)

result, err := h.doUpload(ctx, resumeCtx, addr, nodeID, localPath, key, url, fileSize, startOffset, localHash)
if err == nil {
if attempt > 1 {
xlog.Info("File upload succeeded after retry", "node", nodeID, "file", filepath.Base(localPath), "attempt", attempt)
Expand All @@ -126,50 +155,190 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke
}
lastErr = err

// Non-transient failures (4xx other than 416, hard auth, etc.) abort
// immediately — retrying won't help.
if !isTransientError(err) {
xlog.Error("File upload failed with non-transient error", "node", nodeID, "file", filepath.Base(localPath), "error", err)
return "", err
}

// Caller-cancelled (not deadline) — give up.
if errors.Is(ctx.Err(), context.Canceled) {
return "", fmt.Errorf("upload cancelled by caller after %d attempts: %w", attempt, lastErr)
}

// Outer budget exhausted.
if errors.Is(resumeCtx.Err(), context.DeadlineExceeded) {
return "", fmt.Errorf("uploading %s to node %s failed after %d attempts within %s budget: %w",
localPath, nodeID, attempt, outerBudget, lastErr)
}

xlog.Warn("File upload failed with transient error", "node", nodeID, "file", filepath.Base(localPath),
"attempt", attempt, "of", attempts, "error", err)
"attempt", attempt, "error", err)
}
}

// resumeBudget returns the maximum total time the resumable upload loop will
// spend retrying transient failures end-to-end. Past this budget the upload
// fails rather than spinning forever — 1h covers multi-GB transfers on
// pathological links without letting a wedged server jam the master.
func (h *HTTPFileStager) resumeBudget() time.Duration {
return 1 * time.Hour
}

return "", fmt.Errorf("uploading %s to node %s failed after %d attempts: %w", localPath, nodeID, attempts, lastErr)
// nextBackoff returns the sleep before retry #attempt: 1s, 2s, 4s, ..., capped
// at 30s, with the first sleep (attempt=2) being 1s.
func nextBackoff(attempt int) time.Duration {
if attempt < 2 {
return 0
}
const (
base = 1 * time.Second
ceiling = 30 * time.Second
)
shift := uint(attempt - 2)
if shift > 30 {
shift = 30 // saturate before time.Duration overflows
}
b := base << shift
if b > ceiling || b < 0 {
b = ceiling
}
return b
}

// doUpload performs a single upload attempt.
func (h *HTTPFileStager) doUpload(ctx context.Context, addr, nodeID, localPath, key, url string, fileSize int64) (string, error) {
// resumeOffset asks the server (via HEAD) how many bytes of the current upload
// are already on disk. It returns 0 if the server has no usable partial state
// (no file, finished file with a different hash, or a partial under a
// different target hash). It returns the server-reported size when the
// server's X-Target-SHA256 matches our expected final hash AND the size is
// strictly less than the local file size.
func (h *HTTPFileStager) resumeOffset(ctx context.Context, addr, key, localHash string, fileSize int64) int64 {
if localHash == "" || fileSize <= 0 {
return 0
}
url := fmt.Sprintf("http://%s/v1/files/%s", addr, key)
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return 0
}
if h.token != "" {
req.Header.Set("Authorization", "Bearer "+h.token)
}
resp, err := h.client.Do(req)
if err != nil {
return 0
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return 0
}

sizeStr := resp.Header.Get(HeaderFileSize)
if sizeStr == "" {
return 0
}
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil || size <= 0 || size >= fileSize {
return 0
}

target := resp.Header.Get(HeaderTargetSHA256)
if target == "" || !strings.EqualFold(target, localHash) {
// No partial-upload metadata, or it's for a different target.
return 0
}

xlog.Info("Resuming upload from server-reported offset", "key", key, "offset", size, "total", fileSize)
return size
}

// doUpload performs a single upload attempt. When startOffset > 0 the request
// is sent as a resumable PUT with a Content-Range header, transferring only
// the bytes from startOffset to fileSize-1. The outerCtx is the long-lived
// resume budget; reqCtx is what's bound to the request (currently the same as
// the parent ctx, since http.Client doesn't expose a per-request timeout).
func (h *HTTPFileStager) doUpload(ctx, outerCtx context.Context, addr, nodeID, localPath, key, url string, fileSize, startOffset int64, expectedHash string) (string, error) {
if startOffset < 0 || startOffset > fileSize {
startOffset = 0
}

f, err := os.Open(localPath)
if err != nil {
return "", fmt.Errorf("opening local file %s: %w", localPath, err)
}
defer f.Close()

if startOffset > 0 {
if _, err := f.Seek(startOffset, io.SeekStart); err != nil {
return "", fmt.Errorf("seeking to offset %d in %s: %w", startOffset, localPath, err)
}
}

chunkLen := fileSize - startOffset

var body io.Reader = f
cb := StagingProgressFromContext(ctx)
// For files > 100MB or when a progress callback is set, wrap with progress reporting
// For files > 100MB or when a progress callback is set, wrap with progress reporting.
// We report against the FULL fileSize (not the chunkLen) so a resumed upload's
// progress bar starts from the actual completed fraction rather than at 0%.
const progressThreshold = 100 << 20
if fileSize > progressThreshold || cb != nil {
body = newProgressReader(f, fileSize, filepath.Base(localPath), nodeID, cb)
pr := newProgressReader(f, fileSize, filepath.Base(localPath), nodeID, cb)
pr.read = startOffset // seed prior progress
body = pr
}

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
// The body length we actually send.
limitedBody := io.LimitReader(body, chunkLen)

req, err := http.NewRequestWithContext(outerCtx, http.MethodPut, url, limitedBody)
if err != nil {
return "", fmt.Errorf("creating request: %w", err)
}
req.ContentLength = fileSize // explicit Content-Length for progress tracking
req.ContentLength = chunkLen
req.Header.Set("Content-Type", "application/octet-stream")
if h.token != "" {
req.Header.Set("Authorization", "Bearer "+h.token)
}
if expectedHash != "" {
// Lets the server detect cross-attempt content drift and reject
// resume with 409 if the local file changed identity.
req.Header.Set(HeaderContentSHA256, expectedHash)
}
if startOffset > 0 || (expectedHash != "" && fileSize > 0) {
// Send Content-Range even on the first chunk (0-...) when we have an
// expected hash, so the server's range-aware branch records the
// target-hash sidecar for future resume attempts.
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", startOffset, fileSize-1, fileSize))
}

resp, err := h.client.Do(req)
if err != nil {
xlog.Error("File upload failed", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "error", err)
xlog.Error("File upload failed", "node", nodeID, "file", filepath.Base(localPath),
"size", humanFileSize(fileSize), "offset", startOffset, "error", err)
return "", fmt.Errorf("uploading %s to node %s: %w", localPath, nodeID, err)
}
defer resp.Body.Close()

// 308 Permanent Redirect ("Resume Incomplete") means the chunk landed but
// the upload as a whole hasn't completed. From our perspective the
// connection survived and the server has more bytes than before — but
// since we always send the whole remainder, hitting 308 means the server
// truncated us. Treat as transient so the retry loop re-HEADs and tries
// again from the new offset.
if resp.StatusCode == http.StatusPermanentRedirect {
body, _ := io.ReadAll(resp.Body)
return "", &transientStatusError{status: resp.StatusCode, msg: fmt.Sprintf("server reports resume-incomplete: %s", string(body))}
}

// 416 Range Not Satisfiable: client/server disagree on offset. Treat as
// transient — the next iteration re-HEADs to learn the correct offset.
if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
body, _ := io.ReadAll(resp.Body)
return "", &transientStatusError{status: resp.StatusCode, msg: fmt.Sprintf("range not satisfiable: %s", string(body))}
}

if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
xlog.Error("File upload rejected by remote node", "node", nodeID, "file", filepath.Base(localPath), "status", resp.StatusCode, "response", string(respBody))
Expand All @@ -187,11 +356,30 @@ func (h *HTTPFileStager) doUpload(ctx context.Context, addr, nodeID, localPath,
return result.LocalPath, nil
}

// transientStatusError wraps an HTTP status that should be treated as
// transient by the upload retry loop.
type transientStatusError struct {
status int
msg string
}

func (e *transientStatusError) Error() string {
return fmt.Sprintf("HTTP %d: %s", e.status, e.msg)
}

func (e *transientStatusError) Transient() bool { return true }

// isTransientError returns true if the error is likely transient and worth retrying.
func isTransientError(err error) bool {
if err == nil {
return false
}
// Errors that explicitly opt into transient semantics (e.g. 308/416 from
// the resumable-upload protocol).
var transient interface{ Transient() bool }
if errors.As(err, &transient) && transient.Transient() {
return true
}
// Connection reset by peer
if errors.Is(err, syscall.ECONNRESET) {
return true
Expand Down
Loading
Loading