From 7e6c38f839cb5f72aae02ab1d68e220a7a360fd0 Mon Sep 17 00:00:00 2001 From: Swarit Pandey Date: Tue, 12 May 2026 06:16:17 +0530 Subject: [PATCH 1/2] fix(telemetry): detect intercepted uploads on client side MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The agent currently treats any HTTP 200 from the presigned-URL PUT as proof the upload reached S3 and immediately calls notify. A TLS- inspecting proxy, DLP appliance, or outbound-filtering firewall on a customer network can terminate that PUT mid-flight and synthesize a 200 response without forwarding the bytes — the run is then reported as a silent success despite no object ever existing in S3. Two checks, in order: 1. After the PUT returns 200, require x-amz-request-id and/or x-amz-id-2 on the response. Real S3 always sets these; a proxy fake-200 can't reproduce them. Missing both → non-retryable failure with the response Server header captured for forensics. 2. Before calling notify, POST /telemetry/confirm-upload to ask the backend whether the object actually exists in S3. A definitive uploaded=false is fatal (bail with reason). 404 (old backend without the endpoint), non-200, or transport errors fall through to notify, whose own server-side precheck remains the safety net. On success the agent prints a definitive "Upload confirmed in S3 (N bytes)" line, separate from "Backend processing started", so the user can tell upload-landed from processing-kicked-off. Adds tests for: real-S3 happy path, synthetic-200 PUT rejection, confirm=false fatal path, confirm 404 fallback, confirm 5xx fallback. Signed-off-by: Swarit Pandey --- internal/telemetry/telemetry.go | 103 ++++++++++++ internal/telemetry/telemetry_test.go | 225 ++++++++++++++++++++++++++- 2 files changed, 327 insertions(+), 1 deletion(-) diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 82a9e72..9735c6b 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -674,6 +674,22 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe } if err == nil && putResp.StatusCode == http.StatusOK { + // A real S3 PUT response always carries x-amz-request-id and + // x-amz-id-2 headers. If both are missing on a "200 OK", the + // response did not originate from AWS — typically a TLS-inspecting + // proxy, DLP appliance, or outbound-filtering firewall has + // terminated the connection and synthesized a success without + // forwarding the body to S3. Treat as a non-retryable failure so + // the run is reported as failed instead of cheerfully claiming + // success on bytes that never reached AWS. + reqID := putResp.Header.Get("x-amz-request-id") + id2 := putResp.Header.Get("x-amz-id-2") + if reqID == "" && id2 == "" { + proxyHint := putResp.Header.Get("Server") + _, _ = io.Copy(io.Discard, putResp.Body) + _ = putResp.Body.Close() + return fmt.Errorf("S3 PUT returned 200 but the response is not from AWS (missing x-amz-request-id and x-amz-id-2; Server=%q) — the upload likely did not reach S3, possibly intercepted by a TLS-inspecting proxy or outbound firewall on this network", proxyHint) + } log.Progress("Uploaded to S3 in %s", elapsed) break } @@ -709,6 +725,20 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe defer func() { _ = putResp.Body.Close() }() _, _ = io.Copy(io.Discard, putResp.Body) + // Ask the backend to confirm the object actually exists in S3 before + // proceeding to notify. This catches the rare case where the PUT + // returned an AWS-headers-bearing 200 but the bytes still failed to + // persist, and gives the user a definitive line in the progress log + // independent of "backend processing started". + // + // The endpoint is best-effort from the agent's point of view: a + // definitive uploaded=false is fatal, but transport errors or unknown + // status codes (e.g. an old backend that doesn't expose this route) + // fall through to notify, whose own precheck is the safety net. + if err := confirmUploadInS3(ctx, log, client, urlResp.S3Key, payload.DeviceID); err != nil { + return err + } + // Notify backend log.Progress("Notifying backend of upload...") notifyBody, _ := json.Marshal(map[string]string{ @@ -744,6 +774,79 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe return nil } +// confirmUploadInS3 calls the backend's /telemetry/confirm-upload endpoint +// to get a definitive uploaded/not-uploaded answer for the s3_key the +// agent was issued. Returns a non-nil error only when the backend +// confirms the object is missing (uploaded=false) — anything else +// (transport failure, non-200, parse error, old backend without the +// route) is logged and treated as non-fatal so the run still proceeds +// to notify, which has its own precheck as a safety net. +func confirmUploadInS3(ctx context.Context, log *progress.Logger, client *http.Client, s3Key, deviceID string) error { + log.Progress("Confirming upload reached S3...") + + body, _ := json.Marshal(map[string]string{ + "s3_key": s3Key, + "device_id": deviceID, + }) + endpoint := fmt.Sprintf("%s/v1/%s/developer-mdm-agent/telemetry/confirm-upload", + config.APIEndpoint, config.CustomerID) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + log.Warn("confirm-upload: request build failed (%v); proceeding to notify", err) + return nil + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+config.APIKey) + req.Header.Set("X-Agent-Version", buildinfo.Version) + + resp, err := client.Do(req) + if err != nil { + log.Warn("confirm-upload: request failed (%v); proceeding to notify", err) + return nil + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode == http.StatusNotFound { + // Backend predates the confirm-upload endpoint. Fine — notify's + // precheck still catches a missing object server-side. + log.Debug("confirm-upload: endpoint not available on this backend; falling back to notify-only flow") + _, _ = io.Copy(io.Discard, resp.Body) + return nil + } + if resp.StatusCode != http.StatusOK { + log.Warn("confirm-upload: HTTP %d; proceeding to notify", resp.StatusCode) + _, _ = io.Copy(io.Discard, resp.Body) + return nil + } + + var result struct { + Uploaded bool `json:"uploaded"` + SizeBytes int64 `json:"size_bytes"` + Reason string `json:"reason"` + S3Key string `json:"s3_key"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + log.Warn("confirm-upload: malformed response (%v); proceeding to notify", err) + return nil + } + + if !result.Uploaded { + // Definitive negative answer from the backend — the object the + // agent thought it uploaded is not in S3. Stop here so the run + // is recorded as failed with a specific reason, rather than + // queueing a doomed worker job. + reason := result.Reason + if reason == "" { + reason = "unknown" + } + return fmt.Errorf("backend reports upload not in S3 (reason=%s, s3_key=%s) — the PUT returned success but no object exists; likely intercepted upstream", reason, s3Key) + } + + log.Progress("Upload confirmed in S3 (%d bytes)", result.SizeBytes) + return nil +} + // gzipBytes returns a gzip-compressed copy of the input bytes. func gzipBytes(data []byte) ([]byte, error) { var buf bytes.Buffer diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go index 4c7f8fd..afb80d1 100644 --- a/internal/telemetry/telemetry_test.go +++ b/internal/telemetry/telemetry_test.go @@ -10,6 +10,7 @@ import ( "net/http/httptest" "strings" "sync" + "sync/atomic" "testing" "github.com/step-security/dev-machine-guard/internal/config" @@ -50,17 +51,21 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { ) // Mock S3 PUT endpoint — captures the body the agent uploads. + // Emits x-amz-request-id so the client's "real AWS response" check + // (uploadToS3 in telemetry.go) treats this 200 as genuine. s3Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) mu.Lock() putBody = body putContentType = r.Header.Get("Content-Type") mu.Unlock() + w.Header().Set("x-amz-request-id", "TESTREQID000000") + w.Header().Set("x-amz-id-2", "test-id-2") w.WriteHeader(http.StatusOK) })) defer s3Server.Close() - // Mock backend — handles upload-URL and process-uploaded calls. + // Mock backend — handles upload-URL, confirm-upload, and process-uploaded. backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): @@ -72,6 +77,12 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { "upload_url": s3Server.URL + "/put", "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + _ = json.NewEncoder(w).Encode(map[string]any{ + "uploaded": true, + "size_bytes": 4242, + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): body, _ := io.ReadAll(r.Body) mu.Lock() @@ -156,3 +167,215 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { t.Errorf("expected execution_id=%q in notify body, got %q", testExecutionID, notify["execution_id"]) } } + +// TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders simulates a TLS-inspecting +// proxy / DLP appliance that terminates the agent's outbound PUT to S3 and +// returns a synthetic "200 OK" without forwarding the body. A real S3 response +// always carries x-amz-request-id and x-amz-id-2 — the agent must treat a 200 +// missing both as an upload failure rather than silently calling notify with +// an s3_key whose object was never persisted. +func TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders(t *testing.T) { + var notifyCalls atomic.Int32 + + // Mock "proxy" responding 200 with no AWS headers — what a corporate + // TLS interceptor produces when it terminates the PUT in-flight. + fakeProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "fake-proxy/1.0") + w.WriteHeader(http.StatusOK) + })) + defer fakeProxy.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": fakeProxy.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + + origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey + config.APIEndpoint = backendServer.URL + config.CustomerID = "test-customer" + config.APIKey = "test-key" + defer func() { + config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey + }() + + payload := &Payload{CustomerID: "test-customer", DeviceID: "dev-1"} + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload, "11111111-2222-4333-8444-555555555555") + if err == nil { + t.Fatalf("uploadToS3 should fail when the PUT response is missing AWS request id headers") + } + if !strings.Contains(err.Error(), "not from AWS") { + t.Errorf("expected error to mention 'not from AWS', got: %v", err) + } + if !strings.Contains(err.Error(), "fake-proxy/1.0") { + t.Errorf("expected error to include the Server header hint, got: %v", err) + } + if got := notifyCalls.Load(); got != 0 { + t.Errorf("notify endpoint must not be called when upload is rejected, got %d call(s)", got) + } +} + +// newAWSHeaderS3Server returns an httptest server that responds 200 to PUTs +// with the AWS request id headers a real S3 sets, so the agent's response +// verification accepts it. +func newAWSHeaderS3Server(t *testing.T) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("x-amz-request-id", "TESTREQID000000") + w.Header().Set("x-amz-id-2", "test-id-2") + w.WriteHeader(http.StatusOK) + })) +} + +func withTestConfig(t *testing.T, endpoint string) { + t.Helper() + origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey + config.APIEndpoint = endpoint + config.CustomerID = "test-customer" + config.APIKey = "test-key" + t.Cleanup(func() { + config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey + }) +} + +// TestUploadToS3_ConfirmUploadFalseIsFatal exercises the "backend confirms +// the object is not in S3" branch — the PUT response looked real (AWS +// headers present) but the backend HEAD says nothing landed. The agent +// must bail and not call notify, because notify would only re-discover +// the same fact moments later. +func TestUploadToS3_ConfirmUploadFalseIsFatal(t *testing.T) { + var notifyCalls atomic.Int32 + + s3Server := newAWSHeaderS3Server(t) + defer s3Server.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": s3Server.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + _ = json.NewEncoder(w).Encode(map[string]any{ + "uploaded": false, + "reason": "object_not_found", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + withTestConfig(t, backendServer.URL) + + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err == nil { + t.Fatal("uploadToS3 must fail when backend confirms upload is not in S3") + } + if !strings.Contains(err.Error(), "not in S3") { + t.Errorf("expected error to mention 'not in S3', got: %v", err) + } + if !strings.Contains(err.Error(), "object_not_found") { + t.Errorf("expected error to include the backend reason, got: %v", err) + } + if got := notifyCalls.Load(); got != 0 { + t.Errorf("notify must not be called when confirm reports uploaded=false, got %d call(s)", got) + } +} + +// TestUploadToS3_ConfirmUpload404FallsThroughToNotify covers compatibility +// with older backends that don't expose /telemetry/confirm-upload yet. +// A 404 on confirm must NOT fail the run — notify still gets called and +// the upload completes via the existing flow. +func TestUploadToS3_ConfirmUpload404FallsThroughToNotify(t *testing.T) { + var notifyCalls atomic.Int32 + + s3Server := newAWSHeaderS3Server(t) + defer s3Server.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": s3Server.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + // Simulate an old backend that doesn't know this route. + http.NotFound(w, r) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + withTestConfig(t, backendServer.URL) + + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err != nil { + t.Fatalf("uploadToS3 must succeed when confirm-upload is unsupported (404), got: %v", err) + } + if got := notifyCalls.Load(); got != 1 { + t.Errorf("notify must still be called when confirm-upload returns 404, got %d call(s)", got) + } +} + +// TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify covers transient +// backend failure of the confirm endpoint. The agent must not fail the +// run for that — notify still has its own server-side precheck. +func TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify(t *testing.T) { + var notifyCalls atomic.Int32 + + s3Server := newAWSHeaderS3Server(t) + defer s3Server.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": s3Server.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + w.WriteHeader(http.StatusBadGateway) + _, _ = w.Write([]byte(`{"error":"s3_check_failed"}`)) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + withTestConfig(t, backendServer.URL) + + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err != nil { + t.Fatalf("uploadToS3 must succeed when confirm-upload returns 5xx, got: %v", err) + } + if got := notifyCalls.Load(); got != 1 { + t.Errorf("notify must still be called when confirm-upload returns 5xx, got %d call(s)", got) + } +} From 16d58c377930f5a9d05b6c958d8ea944c9974660 Mon Sep 17 00:00:00 2001 From: Swarit Pandey Date: Thu, 14 May 2026 05:52:17 +0530 Subject: [PATCH 2/2] chore: address comments Signed-off-by: Swarit Pandey --- internal/telemetry/telemetry.go | 180 ++++++++-------- internal/telemetry/telemetry_test.go | 308 +++++++++++++++++---------- 2 files changed, 295 insertions(+), 193 deletions(-) diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 9735c6b..2255c71 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -25,6 +25,11 @@ import ( "github.com/step-security/dev-machine-guard/internal/progress" ) +// s3UploadBackoffUnit is multiplied by attempt-number to compute the +// inter-attempt sleep on S3 PUT retries. Lifted to a package var so tests +// can shrink it; production code never mutates it. +var s3UploadBackoffUnit = 2 * time.Second + // Payload is the enterprise telemetry JSON structure. type Payload struct { CustomerID string `json:"customer_id"` @@ -656,7 +661,9 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe log.Progress("Uploading telemetry to S3 (%d bytes)...", len(compressedPayload)) s3Client := &http.Client{Timeout: 10 * time.Minute} const maxRetries = 3 - var putResp *http.Response + backoffUnit := s3UploadBackoffUnit + uploaded := false + var lastFailure string for attempt := 1; attempt <= maxRetries; attempt++ { uploadStart := time.Now() putReq, reqErr := http.NewRequestWithContext(ctx, http.MethodPut, urlResp.UploadURL, bytes.NewReader(compressedPayload)) @@ -665,78 +672,78 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe } putReq.Header.Set("Content-Type", "application/json") - putResp, err = s3Client.Do(putReq) + putResp, putErr := s3Client.Do(putReq) elapsed := time.Since(uploadStart) - if err != nil { - log.Debug("s3 PUT attempt %d/%d: error=%v elapsed=%s", attempt, maxRetries, err, elapsed) + if putErr != nil { + log.Debug("s3 PUT attempt %d/%d: error=%v elapsed=%s", attempt, maxRetries, putErr, elapsed) + lastFailure = fmt.Sprintf("S3 PUT error: %v", putErr) } else { log.Debug("s3 PUT attempt %d/%d: status=%d elapsed=%s payload_bytes=%d", attempt, maxRetries, putResp.StatusCode, elapsed, len(payloadJSON)) } - if err == nil && putResp.StatusCode == http.StatusOK { + if putErr == nil && putResp.StatusCode == http.StatusOK { // A real S3 PUT response always carries x-amz-request-id and - // x-amz-id-2 headers. If both are missing on a "200 OK", the - // response did not originate from AWS — typically a TLS-inspecting - // proxy, DLP appliance, or outbound-filtering firewall has - // terminated the connection and synthesized a success without - // forwarding the body to S3. Treat as a non-retryable failure so - // the run is reported as failed instead of cheerfully claiming - // success on bytes that never reached AWS. + // x-amz-id-2. If both are missing, the response did not + // originate from AWS — typically a TLS-inspecting proxy or + // outbound-filtering firewall has terminated the connection + // and synthesized a success without forwarding the body. Ask + // the backend whether the object actually landed in S3 before + // trusting this 200. reqID := putResp.Header.Get("x-amz-request-id") id2 := putResp.Header.Get("x-amz-id-2") - if reqID == "" && id2 == "" { - proxyHint := putResp.Header.Get("Server") - _, _ = io.Copy(io.Discard, putResp.Body) - _ = putResp.Body.Close() - return fmt.Errorf("S3 PUT returned 200 but the response is not from AWS (missing x-amz-request-id and x-amz-id-2; Server=%q) — the upload likely did not reach S3, possibly intercepted by a TLS-inspecting proxy or outbound firewall on this network", proxyHint) + proxyHint := putResp.Header.Get("Server") + _, _ = io.Copy(io.Discard, putResp.Body) + _ = putResp.Body.Close() + + if reqID != "" || id2 != "" { + log.Progress("Uploaded to S3 in %s", elapsed) + uploaded = true + break } - log.Progress("Uploaded to S3 in %s", elapsed) - break - } - // Clean up response body before retry - if putResp != nil { + log.Warn("S3 PUT returned 200 without AWS request id headers (Server=%q) — verifying with backend", proxyHint) + result, reason := checkUploadInS3(ctx, log, client, urlResp.S3Key, payload.DeviceID) + switch result { + case uploadCheckConfirmed: + log.Progress("Uploaded to S3 in %s (verified by backend)", elapsed) + uploaded = true + case uploadCheckUnsupported: + // Backend predates confirm-upload — we can't verify, so + // trust the 200 for compatibility. The notify endpoint's + // own precheck is the remaining safety net. + log.Debug("backend does not support confirm-upload; proceeding on the 200 alone") + log.Progress("Uploaded to S3 in %s", elapsed) + uploaded = true + case uploadCheckMissing: + lastFailure = fmt.Sprintf("backend confirmed the object is not in S3 (reason=%s)", reason) + case uploadCheckIndeterminate: + lastFailure = "backend could not verify the upload" + } + if uploaded { + break + } + } else if putResp != nil { _, _ = io.Copy(io.Discard, putResp.Body) _ = putResp.Body.Close() + lastFailure = fmt.Sprintf("S3 PUT returned status %d", putResp.StatusCode) } if attempt == maxRetries { - if err != nil { - return fmt.Errorf("uploading to S3 (payload: %d bytes, elapsed: %s, attempts: %d): %w", - len(compressedPayload), elapsed, maxRetries, err) - } - return fmt.Errorf("S3 upload failed with status %d (payload: %d bytes, attempts: %d)", - putResp.StatusCode, len(compressedPayload), maxRetries) + break } - // Log retry and backoff - backoff := time.Duration(attempt) * 2 * time.Second - if err != nil { - log.Warn("S3 upload attempt %d/%d failed after %s: %v; retrying in %s...", attempt, maxRetries, elapsed, err, backoff) - } else { - log.Warn("S3 upload attempt %d/%d got status %d, retrying in %s...", attempt, maxRetries, putResp.StatusCode, backoff) - } + backoff := time.Duration(attempt) * backoffUnit + log.Warn("S3 upload attempt %d/%d failed (%s); retrying in %s...", attempt, maxRetries, lastFailure, backoff) select { case <-time.After(backoff): case <-ctx.Done(): return ctx.Err() } } - defer func() { _ = putResp.Body.Close() }() - _, _ = io.Copy(io.Discard, putResp.Body) - // Ask the backend to confirm the object actually exists in S3 before - // proceeding to notify. This catches the rare case where the PUT - // returned an AWS-headers-bearing 200 but the bytes still failed to - // persist, and gives the user a definitive line in the progress log - // independent of "backend processing started". - // - // The endpoint is best-effort from the agent's point of view: a - // definitive uploaded=false is fatal, but transport errors or unknown - // status codes (e.g. an old backend that doesn't expose this route) - // fall through to notify, whose own precheck is the safety net. - if err := confirmUploadInS3(ctx, log, client, urlResp.S3Key, payload.DeviceID); err != nil { - return err + if !uploaded { + return fmt.Errorf("telemetry upload failed after %d attempts: %s (payload: %d bytes) — the network may be intercepting outbound traffic to S3 (TLS-inspecting proxy, DLP appliance, or outbound firewall)", + maxRetries, lastFailure, len(compressedPayload)) } // Notify backend @@ -774,14 +781,29 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe return nil } -// confirmUploadInS3 calls the backend's /telemetry/confirm-upload endpoint -// to get a definitive uploaded/not-uploaded answer for the s3_key the -// agent was issued. Returns a non-nil error only when the backend -// confirms the object is missing (uploaded=false) — anything else -// (transport failure, non-200, parse error, old backend without the -// route) is logged and treated as non-fatal so the run still proceeds -// to notify, which has its own precheck as a safety net. -func confirmUploadInS3(ctx context.Context, log *progress.Logger, client *http.Client, s3Key, deviceID string) error { +// uploadCheckResult is the four-valued answer the agent gets back when it +// asks the backend whether a PUT'd s3_key actually landed in S3. +type uploadCheckResult int + +const ( + // uploadCheckConfirmed = backend HEAD'd the object and it exists. + uploadCheckConfirmed uploadCheckResult = iota + // uploadCheckMissing = backend HEAD'd the object and it does not exist. + uploadCheckMissing + // uploadCheckUnsupported = backend predates the confirm-upload route + // (HTTP 404). We can't verify; for compatibility callers should trust + // the original PUT response. + uploadCheckUnsupported + // uploadCheckIndeterminate = transient failure (5xx, transport error, + // parse error). The answer is unknown; callers should retry. + uploadCheckIndeterminate +) + +// checkUploadInS3 calls the backend's /telemetry/confirm-upload endpoint +// and translates the response into a uploadCheckResult. On +// uploadCheckMissing the second return value carries the backend's +// reason string (e.g. "object_not_found"). +func checkUploadInS3(ctx context.Context, log *progress.Logger, client *http.Client, s3Key, deviceID string) (uploadCheckResult, string) { log.Progress("Confirming upload reached S3...") body, _ := json.Marshal(map[string]string{ @@ -793,8 +815,8 @@ func confirmUploadInS3(ctx context.Context, log *progress.Logger, client *http.C req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { - log.Warn("confirm-upload: request build failed (%v); proceeding to notify", err) - return nil + log.Warn("confirm-upload: request build failed: %v", err) + return uploadCheckIndeterminate, "" } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+config.APIKey) @@ -802,22 +824,19 @@ func confirmUploadInS3(ctx context.Context, log *progress.Logger, client *http.C resp, err := client.Do(req) if err != nil { - log.Warn("confirm-upload: request failed (%v); proceeding to notify", err) - return nil + log.Warn("confirm-upload: request failed: %v", err) + return uploadCheckIndeterminate, "" } defer func() { _ = resp.Body.Close() }() if resp.StatusCode == http.StatusNotFound { - // Backend predates the confirm-upload endpoint. Fine — notify's - // precheck still catches a missing object server-side. - log.Debug("confirm-upload: endpoint not available on this backend; falling back to notify-only flow") _, _ = io.Copy(io.Discard, resp.Body) - return nil + return uploadCheckUnsupported, "" } if resp.StatusCode != http.StatusOK { - log.Warn("confirm-upload: HTTP %d; proceeding to notify", resp.StatusCode) + log.Warn("confirm-upload: HTTP %d", resp.StatusCode) _, _ = io.Copy(io.Discard, resp.Body) - return nil + return uploadCheckIndeterminate, "" } var result struct { @@ -827,24 +846,19 @@ func confirmUploadInS3(ctx context.Context, log *progress.Logger, client *http.C S3Key string `json:"s3_key"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - log.Warn("confirm-upload: malformed response (%v); proceeding to notify", err) - return nil - } - - if !result.Uploaded { - // Definitive negative answer from the backend — the object the - // agent thought it uploaded is not in S3. Stop here so the run - // is recorded as failed with a specific reason, rather than - // queueing a doomed worker job. - reason := result.Reason - if reason == "" { - reason = "unknown" - } - return fmt.Errorf("backend reports upload not in S3 (reason=%s, s3_key=%s) — the PUT returned success but no object exists; likely intercepted upstream", reason, s3Key) + log.Warn("confirm-upload: malformed response: %v", err) + return uploadCheckIndeterminate, "" } - log.Progress("Upload confirmed in S3 (%d bytes)", result.SizeBytes) - return nil + if result.Uploaded { + log.Debug("confirm-upload: backend reports object present (%d bytes)", result.SizeBytes) + return uploadCheckConfirmed, "" + } + reason := result.Reason + if reason == "" { + reason = "unknown" + } + return uploadCheckMissing, reason } // gzipBytes returns a gzip-compressed copy of the input bytes. diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go index afb80d1..98c614b 100644 --- a/internal/telemetry/telemetry_test.go +++ b/internal/telemetry/telemetry_test.go @@ -12,6 +12,7 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/step-security/dev-machine-guard/internal/config" "github.com/step-security/dev-machine-guard/internal/progress" @@ -49,10 +50,11 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { putContentType string notifyBody []byte ) + var confirmCalls atomic.Int32 // Mock S3 PUT endpoint — captures the body the agent uploads. // Emits x-amz-request-id so the client's "real AWS response" check - // (uploadToS3 in telemetry.go) treats this 200 as genuine. + // treats this 200 as genuine and skips the confirm step. s3Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) mu.Lock() @@ -65,7 +67,6 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { })) defer s3Server.Close() - // Mock backend — handles upload-URL, confirm-upload, and process-uploaded. backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): @@ -78,11 +79,8 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): - _ = json.NewEncoder(w).Encode(map[string]any{ - "uploaded": true, - "size_bytes": 4242, - "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", - }) + confirmCalls.Add(1) + _ = json.NewEncoder(w).Encode(map[string]any{"uploaded": true, "size_bytes": 4242}) case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): body, _ := io.ReadAll(r.Body) mu.Lock() @@ -95,29 +93,25 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { })) defer backendServer.Close() - // Override config globals for the duration of the test. - origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey - config.APIEndpoint = backendServer.URL - config.CustomerID = "test-customer" - config.APIKey = "test-key" - defer func() { - config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey - }() + withTestConfig(t, backendServer.URL) - payload := &Payload{ - CustomerID: "test-customer", - DeviceID: "dev-1", - } + payload := &Payload{CustomerID: "test-customer", DeviceID: "dev-1"} const testExecutionID = "11111111-2222-4333-8444-555555555555" if err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload, testExecutionID); err != nil { t.Fatalf("uploadToS3 failed: %v", err) } + // On a clean AWS-headered 200, the agent must NOT consult confirm-upload — + // that endpoint exists only to disambiguate suspicious (no-AWS-header) PUT + // responses. + if got := confirmCalls.Load(); got != 0 { + t.Errorf("confirm-upload must not be called on a clean AWS-headered 200, got %d call(s)", got) + } + mu.Lock() defer mu.Unlock() - // Upload-URL request body must include is_compressed: true. var uploadReq map[string]any if err := json.Unmarshal(uploadURLBody, &uploadReq); err != nil { t.Fatalf("failed to parse upload-URL request body: %v", err) @@ -129,7 +123,6 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { t.Errorf("expected is_compressed=true, got %v", uploadReq["is_compressed"]) } - // PUT body must be gzip-compressed. if len(putBody) < 2 || putBody[0] != 0x1f || putBody[1] != 0x8b { t.Fatalf("expected gzip-compressed PUT body (got %d bytes)", len(putBody)) } @@ -137,7 +130,6 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { t.Errorf("expected Content-Type application/json (matches presigned URL), got %q", putContentType) } - // Decompressing the PUT body should yield the original JSON payload. gz, err := gzip.NewReader(bytes.NewReader(putBody)) if err != nil { t.Fatalf("PUT body is not valid gzip: %v", err) @@ -155,7 +147,6 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { t.Errorf("decompressed payload device_id mismatch: got %q", roundTrip.DeviceID) } - // Notify-backend was called with the s3_key returned from the upload-URL endpoint. var notify map[string]string if err := json.Unmarshal(notifyBody, ¬ify); err != nil { t.Fatalf("failed to parse notify body: %v", err) @@ -168,17 +159,17 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { } } -// TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders simulates a TLS-inspecting -// proxy / DLP appliance that terminates the agent's outbound PUT to S3 and -// returns a synthetic "200 OK" without forwarding the body. A real S3 response -// always carries x-amz-request-id and x-amz-id-2 — the agent must treat a 200 -// missing both as an upload failure rather than silently calling notify with -// an s3_key whose object was never persisted. -func TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders(t *testing.T) { - var notifyCalls atomic.Int32 +// TestUploadToS3_Synthetic200ConfirmedByBackend covers the case where a TLS +// proxy strips AWS response headers but the bytes still made it to S3 (e.g. +// a transparent proxy that doesn't terminate the body, just rewrites the +// response). The backend confirms the object is present, so the upload is +// accepted and notify proceeds. +func TestUploadToS3_Synthetic200ConfirmedByBackend(t *testing.T) { + var ( + notifyCalls atomic.Int32 + confirmCalls atomic.Int32 + ) - // Mock "proxy" responding 200 with no AWS headers — what a corporate - // TLS interceptor produces when it terminates the PUT in-flight. fakeProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "fake-proxy/1.0") w.WriteHeader(http.StatusOK) @@ -192,6 +183,9 @@ func TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders(t *testing.T) { "upload_url": fakeProxy.URL + "/put", "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + confirmCalls.Add(1) + _ = json.NewEncoder(w).Encode(map[string]any{"uploaded": true, "size_bytes": 4242}) case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): notifyCalls.Add(1) w.WriteHeader(http.StatusOK) @@ -200,77 +194,56 @@ func TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders(t *testing.T) { } })) defer backendServer.Close() + withTestConfig(t, backendServer.URL) - origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey - config.APIEndpoint = backendServer.URL - config.CustomerID = "test-customer" - config.APIKey = "test-key" - defer func() { - config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey - }() - - payload := &Payload{CustomerID: "test-customer", DeviceID: "dev-1"} - err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload, "11111111-2222-4333-8444-555555555555") - if err == nil { - t.Fatalf("uploadToS3 should fail when the PUT response is missing AWS request id headers") - } - if !strings.Contains(err.Error(), "not from AWS") { - t.Errorf("expected error to mention 'not from AWS', got: %v", err) + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err != nil { + t.Fatalf("uploadToS3 must succeed when backend confirms uploaded=true, got: %v", err) } - if !strings.Contains(err.Error(), "fake-proxy/1.0") { - t.Errorf("expected error to include the Server header hint, got: %v", err) + if got := confirmCalls.Load(); got != 1 { + t.Errorf("expected exactly one confirm-upload call (triggered by missing AWS headers), got %d", got) } - if got := notifyCalls.Load(); got != 0 { - t.Errorf("notify endpoint must not be called when upload is rejected, got %d call(s)", got) + if got := notifyCalls.Load(); got != 1 { + t.Errorf("notify must be called once on confirmed upload, got %d", got) } } -// newAWSHeaderS3Server returns an httptest server that responds 200 to PUTs -// with the AWS request id headers a real S3 sets, so the agent's response -// verification accepts it. -func newAWSHeaderS3Server(t *testing.T) *httptest.Server { - t.Helper() - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("x-amz-request-id", "TESTREQID000000") - w.Header().Set("x-amz-id-2", "test-id-2") - w.WriteHeader(http.StatusOK) - })) -} - -func withTestConfig(t *testing.T, endpoint string) { - t.Helper() - origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey - config.APIEndpoint = endpoint - config.CustomerID = "test-customer" - config.APIKey = "test-key" - t.Cleanup(func() { - config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey - }) -} +// TestUploadToS3_Synthetic200MissingExhaustsRetries covers a hard +// TLS-interception failure: the proxy synthesizes 200s without forwarding +// bytes, and the backend definitively reports the object never landed. +// The agent must retry up to maxRetries and then fail loudly so the run +// is recorded as failed with a clear reason instead of cheerfully calling +// notify on a phantom object. +func TestUploadToS3_Synthetic200MissingExhaustsRetries(t *testing.T) { + withFastBackoff(t) -// TestUploadToS3_ConfirmUploadFalseIsFatal exercises the "backend confirms -// the object is not in S3" branch — the PUT response looked real (AWS -// headers present) but the backend HEAD says nothing landed. The agent -// must bail and not call notify, because notify would only re-discover -// the same fact moments later. -func TestUploadToS3_ConfirmUploadFalseIsFatal(t *testing.T) { - var notifyCalls atomic.Int32 + var ( + notifyCalls atomic.Int32 + confirmCalls atomic.Int32 + putCalls atomic.Int32 + ) - s3Server := newAWSHeaderS3Server(t) - defer s3Server.Close() + fakeProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + putCalls.Add(1) + w.Header().Set("Server", "fake-proxy/1.0") + w.WriteHeader(http.StatusOK) + })) + defer fakeProxy.Close() backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): _ = json.NewEncoder(w).Encode(map[string]string{ - "upload_url": s3Server.URL + "/put", + "upload_url": fakeProxy.URL + "/put", "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + confirmCalls.Add(1) _ = json.NewEncoder(w).Encode(map[string]any{ "uploaded": false, "reason": "object_not_found", - "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): notifyCalls.Add(1) @@ -286,38 +259,47 @@ func TestUploadToS3_ConfirmUploadFalseIsFatal(t *testing.T) { &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, "11111111-2222-4333-8444-555555555555") if err == nil { - t.Fatal("uploadToS3 must fail when backend confirms upload is not in S3") + t.Fatal("uploadToS3 must fail when every confirm reports the object missing") } - if !strings.Contains(err.Error(), "not in S3") { - t.Errorf("expected error to mention 'not in S3', got: %v", err) + if !strings.Contains(err.Error(), "telemetry upload failed after 3 attempts") { + t.Errorf("expected error to mention attempt exhaustion, got: %v", err) } if !strings.Contains(err.Error(), "object_not_found") { - t.Errorf("expected error to include the backend reason, got: %v", err) + t.Errorf("expected error to include the backend's reason, got: %v", err) + } + if got := putCalls.Load(); got != 3 { + t.Errorf("expected 3 PUT attempts, got %d", got) + } + if got := confirmCalls.Load(); got != 3 { + t.Errorf("expected 3 confirm-upload calls (one per attempt), got %d", got) } if got := notifyCalls.Load(); got != 0 { - t.Errorf("notify must not be called when confirm reports uploaded=false, got %d call(s)", got) + t.Errorf("notify must not be called when the upload was never confirmed, got %d", got) } } -// TestUploadToS3_ConfirmUpload404FallsThroughToNotify covers compatibility -// with older backends that don't expose /telemetry/confirm-upload yet. -// A 404 on confirm must NOT fail the run — notify still gets called and -// the upload completes via the existing flow. -func TestUploadToS3_ConfirmUpload404FallsThroughToNotify(t *testing.T) { +// TestUploadToS3_Synthetic200UnsupportedBackendTrustsPUT covers an agent +// running against a backend that predates the confirm-upload endpoint. +// We have no way to verify, so the suspicion-triggered check falls back +// to trusting the original 200 — matches pre-PR behavior for compatibility. +func TestUploadToS3_Synthetic200UnsupportedBackendTrustsPUT(t *testing.T) { var notifyCalls atomic.Int32 - s3Server := newAWSHeaderS3Server(t) - defer s3Server.Close() + fakeProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "fake-proxy/1.0") + w.WriteHeader(http.StatusOK) + })) + defer fakeProxy.Close() backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): _ = json.NewEncoder(w).Encode(map[string]string{ - "upload_url": s3Server.URL + "/put", + "upload_url": fakeProxy.URL + "/put", "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): - // Simulate an old backend that doesn't know this route. + // Old backend — endpoint not present. http.NotFound(w, r) case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): notifyCalls.Add(1) @@ -336,27 +318,37 @@ func TestUploadToS3_ConfirmUpload404FallsThroughToNotify(t *testing.T) { t.Fatalf("uploadToS3 must succeed when confirm-upload is unsupported (404), got: %v", err) } if got := notifyCalls.Load(); got != 1 { - t.Errorf("notify must still be called when confirm-upload returns 404, got %d call(s)", got) + t.Errorf("notify must still be called when confirm-upload returns 404, got %d", got) } } -// TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify covers transient -// backend failure of the confirm endpoint. The agent must not fail the -// run for that — notify still has its own server-side precheck. -func TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify(t *testing.T) { - var notifyCalls atomic.Int32 +// TestUploadToS3_Synthetic200IndeterminateExhausts covers a confirm +// endpoint that returns 5xx (e.g. its S3 HEAD failed). We can't tell +// whether the object landed; since we were already suspicious, retry — +// and fail if every attempt is indeterminate. +func TestUploadToS3_Synthetic200IndeterminateExhausts(t *testing.T) { + withFastBackoff(t) - s3Server := newAWSHeaderS3Server(t) - defer s3Server.Close() + var ( + notifyCalls atomic.Int32 + confirmCalls atomic.Int32 + ) + + fakeProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "fake-proxy/1.0") + w.WriteHeader(http.StatusOK) + })) + defer fakeProxy.Close() backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): _ = json.NewEncoder(w).Encode(map[string]string{ - "upload_url": s3Server.URL + "/put", + "upload_url": fakeProxy.URL + "/put", "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + confirmCalls.Add(1) w.WriteHeader(http.StatusBadGateway) _, _ = w.Write([]byte(`{"error":"s3_check_failed"}`)) case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): @@ -369,13 +361,109 @@ func TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify(t *testing.T) { defer backendServer.Close() withTestConfig(t, backendServer.URL) + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err == nil { + t.Fatal("uploadToS3 must fail when every confirm is indeterminate") + } + if !strings.Contains(err.Error(), "could not verify the upload") { + t.Errorf("expected error to mention verification failure, got: %v", err) + } + if got := confirmCalls.Load(); got != 3 { + t.Errorf("expected 3 confirm-upload attempts, got %d", got) + } + if got := notifyCalls.Load(); got != 0 { + t.Errorf("notify must not be called on indeterminate verification, got %d", got) + } +} + +// TestUploadToS3_Synthetic200ThenRealAWSHeaders covers a flaky proxy +// scenario: the first PUT is intercepted (no AWS headers, backend says +// missing), but the retry routes around the proxy and S3 responds for +// real. The upload succeeds without ever needing a notify-time precheck. +func TestUploadToS3_Synthetic200ThenRealAWSHeaders(t *testing.T) { + withFastBackoff(t) + + var ( + notifyCalls atomic.Int32 + confirmCalls atomic.Int32 + putAttempt atomic.Int32 + ) + + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := putAttempt.Add(1) + if n == 1 { + // First attempt: intercepted by proxy. + w.Header().Set("Server", "fake-proxy/1.0") + w.WriteHeader(http.StatusOK) + return + } + // Subsequent attempts: real AWS response. + w.Header().Set("x-amz-request-id", "TESTREQID000000") + w.Header().Set("x-amz-id-2", "test-id-2") + w.WriteHeader(http.StatusOK) + })) + defer upstream.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": upstream.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + confirmCalls.Add(1) + // Object hasn't landed yet (first PUT was intercepted). + _ = json.NewEncoder(w).Encode(map[string]any{ + "uploaded": false, + "reason": "object_not_found", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + withTestConfig(t, backendServer.URL) + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, "11111111-2222-4333-8444-555555555555") if err != nil { - t.Fatalf("uploadToS3 must succeed when confirm-upload returns 5xx, got: %v", err) + t.Fatalf("uploadToS3 must recover when a later attempt reaches real S3, got: %v", err) + } + if got := putAttempt.Load(); got != 2 { + t.Errorf("expected 2 PUT attempts (1 intercepted, 1 real), got %d", got) + } + if got := confirmCalls.Load(); got != 1 { + t.Errorf("expected 1 confirm-upload call (only the first attempt was suspicious), got %d", got) } if got := notifyCalls.Load(); got != 1 { - t.Errorf("notify must still be called when confirm-upload returns 5xx, got %d call(s)", got) + t.Errorf("notify must be called once after successful upload, got %d", got) } } + +func withTestConfig(t *testing.T, endpoint string) { + t.Helper() + origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey + config.APIEndpoint = endpoint + config.CustomerID = "test-customer" + config.APIKey = "test-key" + t.Cleanup(func() { + config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey + }) +} + +// withFastBackoff shrinks the inter-attempt backoff so retry-exhaustion +// tests run in milliseconds instead of seconds. Production code leaves +// the unit at its default of 2s. +func withFastBackoff(t *testing.T) { + t.Helper() + orig := s3UploadBackoffUnit + s3UploadBackoffUnit = 5 * time.Millisecond + t.Cleanup(func() { s3UploadBackoffUnit = orig }) +}