diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 82a9e72..9735c6b 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -674,6 +674,22 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe } if err == nil && putResp.StatusCode == http.StatusOK { + // A real S3 PUT response always carries x-amz-request-id and + // x-amz-id-2 headers. If both are missing on a "200 OK", the + // response did not originate from AWS — typically a TLS-inspecting + // proxy, DLP appliance, or outbound-filtering firewall has + // terminated the connection and synthesized a success without + // forwarding the body to S3. Treat as a non-retryable failure so + // the run is reported as failed instead of cheerfully claiming + // success on bytes that never reached AWS. + reqID := putResp.Header.Get("x-amz-request-id") + id2 := putResp.Header.Get("x-amz-id-2") + if reqID == "" && id2 == "" { + proxyHint := putResp.Header.Get("Server") + _, _ = io.Copy(io.Discard, putResp.Body) + _ = putResp.Body.Close() + return fmt.Errorf("S3 PUT returned 200 but the response is not from AWS (missing x-amz-request-id and x-amz-id-2; Server=%q) — the upload likely did not reach S3, possibly intercepted by a TLS-inspecting proxy or outbound firewall on this network", proxyHint) + } log.Progress("Uploaded to S3 in %s", elapsed) break } @@ -709,6 +725,20 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe defer func() { _ = putResp.Body.Close() }() _, _ = io.Copy(io.Discard, putResp.Body) + // Ask the backend to confirm the object actually exists in S3 before + // proceeding to notify. This catches the rare case where the PUT + // returned an AWS-headers-bearing 200 but the bytes still failed to + // persist, and gives the user a definitive line in the progress log + // independent of "backend processing started". + // + // The endpoint is best-effort from the agent's point of view: a + // definitive uploaded=false is fatal, but transport errors or unknown + // status codes (e.g. an old backend that doesn't expose this route) + // fall through to notify, whose own precheck is the safety net. + if err := confirmUploadInS3(ctx, log, client, urlResp.S3Key, payload.DeviceID); err != nil { + return err + } + // Notify backend log.Progress("Notifying backend of upload...") notifyBody, _ := json.Marshal(map[string]string{ @@ -744,6 +774,79 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe return nil } +// confirmUploadInS3 calls the backend's /telemetry/confirm-upload endpoint +// to get a definitive uploaded/not-uploaded answer for the s3_key the +// agent was issued. Returns a non-nil error only when the backend +// confirms the object is missing (uploaded=false) — anything else +// (transport failure, non-200, parse error, old backend without the +// route) is logged and treated as non-fatal so the run still proceeds +// to notify, which has its own precheck as a safety net. +func confirmUploadInS3(ctx context.Context, log *progress.Logger, client *http.Client, s3Key, deviceID string) error { + log.Progress("Confirming upload reached S3...") + + body, _ := json.Marshal(map[string]string{ + "s3_key": s3Key, + "device_id": deviceID, + }) + endpoint := fmt.Sprintf("%s/v1/%s/developer-mdm-agent/telemetry/confirm-upload", + config.APIEndpoint, config.CustomerID) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + log.Warn("confirm-upload: request build failed (%v); proceeding to notify", err) + return nil + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+config.APIKey) + req.Header.Set("X-Agent-Version", buildinfo.Version) + + resp, err := client.Do(req) + if err != nil { + log.Warn("confirm-upload: request failed (%v); proceeding to notify", err) + return nil + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode == http.StatusNotFound { + // Backend predates the confirm-upload endpoint. Fine — notify's + // precheck still catches a missing object server-side. + log.Debug("confirm-upload: endpoint not available on this backend; falling back to notify-only flow") + _, _ = io.Copy(io.Discard, resp.Body) + return nil + } + if resp.StatusCode != http.StatusOK { + log.Warn("confirm-upload: HTTP %d; proceeding to notify", resp.StatusCode) + _, _ = io.Copy(io.Discard, resp.Body) + return nil + } + + var result struct { + Uploaded bool `json:"uploaded"` + SizeBytes int64 `json:"size_bytes"` + Reason string `json:"reason"` + S3Key string `json:"s3_key"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + log.Warn("confirm-upload: malformed response (%v); proceeding to notify", err) + return nil + } + + if !result.Uploaded { + // Definitive negative answer from the backend — the object the + // agent thought it uploaded is not in S3. Stop here so the run + // is recorded as failed with a specific reason, rather than + // queueing a doomed worker job. + reason := result.Reason + if reason == "" { + reason = "unknown" + } + return fmt.Errorf("backend reports upload not in S3 (reason=%s, s3_key=%s) — the PUT returned success but no object exists; likely intercepted upstream", reason, s3Key) + } + + log.Progress("Upload confirmed in S3 (%d bytes)", result.SizeBytes) + return nil +} + // gzipBytes returns a gzip-compressed copy of the input bytes. func gzipBytes(data []byte) ([]byte, error) { var buf bytes.Buffer diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go index 4c7f8fd..afb80d1 100644 --- a/internal/telemetry/telemetry_test.go +++ b/internal/telemetry/telemetry_test.go @@ -10,6 +10,7 @@ import ( "net/http/httptest" "strings" "sync" + "sync/atomic" "testing" "github.com/step-security/dev-machine-guard/internal/config" @@ -50,17 +51,21 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { ) // Mock S3 PUT endpoint — captures the body the agent uploads. + // Emits x-amz-request-id so the client's "real AWS response" check + // (uploadToS3 in telemetry.go) treats this 200 as genuine. s3Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) mu.Lock() putBody = body putContentType = r.Header.Get("Content-Type") mu.Unlock() + w.Header().Set("x-amz-request-id", "TESTREQID000000") + w.Header().Set("x-amz-id-2", "test-id-2") w.WriteHeader(http.StatusOK) })) defer s3Server.Close() - // Mock backend — handles upload-URL and process-uploaded calls. + // Mock backend — handles upload-URL, confirm-upload, and process-uploaded. backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): @@ -72,6 +77,12 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { "upload_url": s3Server.URL + "/put", "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + _ = json.NewEncoder(w).Encode(map[string]any{ + "uploaded": true, + "size_bytes": 4242, + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): body, _ := io.ReadAll(r.Body) mu.Lock() @@ -156,3 +167,215 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) { t.Errorf("expected execution_id=%q in notify body, got %q", testExecutionID, notify["execution_id"]) } } + +// TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders simulates a TLS-inspecting +// proxy / DLP appliance that terminates the agent's outbound PUT to S3 and +// returns a synthetic "200 OK" without forwarding the body. A real S3 response +// always carries x-amz-request-id and x-amz-id-2 — the agent must treat a 200 +// missing both as an upload failure rather than silently calling notify with +// an s3_key whose object was never persisted. +func TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders(t *testing.T) { + var notifyCalls atomic.Int32 + + // Mock "proxy" responding 200 with no AWS headers — what a corporate + // TLS interceptor produces when it terminates the PUT in-flight. + fakeProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "fake-proxy/1.0") + w.WriteHeader(http.StatusOK) + })) + defer fakeProxy.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": fakeProxy.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + + origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey + config.APIEndpoint = backendServer.URL + config.CustomerID = "test-customer" + config.APIKey = "test-key" + defer func() { + config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey + }() + + payload := &Payload{CustomerID: "test-customer", DeviceID: "dev-1"} + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload, "11111111-2222-4333-8444-555555555555") + if err == nil { + t.Fatalf("uploadToS3 should fail when the PUT response is missing AWS request id headers") + } + if !strings.Contains(err.Error(), "not from AWS") { + t.Errorf("expected error to mention 'not from AWS', got: %v", err) + } + if !strings.Contains(err.Error(), "fake-proxy/1.0") { + t.Errorf("expected error to include the Server header hint, got: %v", err) + } + if got := notifyCalls.Load(); got != 0 { + t.Errorf("notify endpoint must not be called when upload is rejected, got %d call(s)", got) + } +} + +// newAWSHeaderS3Server returns an httptest server that responds 200 to PUTs +// with the AWS request id headers a real S3 sets, so the agent's response +// verification accepts it. +func newAWSHeaderS3Server(t *testing.T) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("x-amz-request-id", "TESTREQID000000") + w.Header().Set("x-amz-id-2", "test-id-2") + w.WriteHeader(http.StatusOK) + })) +} + +func withTestConfig(t *testing.T, endpoint string) { + t.Helper() + origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey + config.APIEndpoint = endpoint + config.CustomerID = "test-customer" + config.APIKey = "test-key" + t.Cleanup(func() { + config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey + }) +} + +// TestUploadToS3_ConfirmUploadFalseIsFatal exercises the "backend confirms +// the object is not in S3" branch — the PUT response looked real (AWS +// headers present) but the backend HEAD says nothing landed. The agent +// must bail and not call notify, because notify would only re-discover +// the same fact moments later. +func TestUploadToS3_ConfirmUploadFalseIsFatal(t *testing.T) { + var notifyCalls atomic.Int32 + + s3Server := newAWSHeaderS3Server(t) + defer s3Server.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": s3Server.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + _ = json.NewEncoder(w).Encode(map[string]any{ + "uploaded": false, + "reason": "object_not_found", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + withTestConfig(t, backendServer.URL) + + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err == nil { + t.Fatal("uploadToS3 must fail when backend confirms upload is not in S3") + } + if !strings.Contains(err.Error(), "not in S3") { + t.Errorf("expected error to mention 'not in S3', got: %v", err) + } + if !strings.Contains(err.Error(), "object_not_found") { + t.Errorf("expected error to include the backend reason, got: %v", err) + } + if got := notifyCalls.Load(); got != 0 { + t.Errorf("notify must not be called when confirm reports uploaded=false, got %d call(s)", got) + } +} + +// TestUploadToS3_ConfirmUpload404FallsThroughToNotify covers compatibility +// with older backends that don't expose /telemetry/confirm-upload yet. +// A 404 on confirm must NOT fail the run — notify still gets called and +// the upload completes via the existing flow. +func TestUploadToS3_ConfirmUpload404FallsThroughToNotify(t *testing.T) { + var notifyCalls atomic.Int32 + + s3Server := newAWSHeaderS3Server(t) + defer s3Server.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": s3Server.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + // Simulate an old backend that doesn't know this route. + http.NotFound(w, r) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + withTestConfig(t, backendServer.URL) + + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err != nil { + t.Fatalf("uploadToS3 must succeed when confirm-upload is unsupported (404), got: %v", err) + } + if got := notifyCalls.Load(); got != 1 { + t.Errorf("notify must still be called when confirm-upload returns 404, got %d call(s)", got) + } +} + +// TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify covers transient +// backend failure of the confirm endpoint. The agent must not fail the +// run for that — notify still has its own server-side precheck. +func TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify(t *testing.T) { + var notifyCalls atomic.Int32 + + s3Server := newAWSHeaderS3Server(t) + defer s3Server.Close() + + backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"): + _ = json.NewEncoder(w).Encode(map[string]string{ + "upload_url": s3Server.URL + "/put", + "s3_key": "developer-mdm/test-customer/dev-1/123.json.gz", + }) + case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"): + w.WriteHeader(http.StatusBadGateway) + _, _ = w.Write([]byte(`{"error":"s3_check_failed"}`)) + case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"): + notifyCalls.Add(1) + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer backendServer.Close() + withTestConfig(t, backendServer.URL) + + err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), + &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}, + "11111111-2222-4333-8444-555555555555") + if err != nil { + t.Fatalf("uploadToS3 must succeed when confirm-upload returns 5xx, got: %v", err) + } + if got := notifyCalls.Load(); got != 1 { + t.Errorf("notify must still be called when confirm-upload returns 5xx, got %d call(s)", got) + } +}