Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,22 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe
}

if err == nil && putResp.StatusCode == http.StatusOK {
// A real S3 PUT response always carries x-amz-request-id and
// x-amz-id-2 headers. If both are missing on a "200 OK", the
// response did not originate from AWS — typically a TLS-inspecting
// proxy, DLP appliance, or outbound-filtering firewall has
// terminated the connection and synthesized a success without
// forwarding the body to S3. Treat as a non-retryable failure so
// the run is reported as failed instead of cheerfully claiming
// success on bytes that never reached AWS.
reqID := putResp.Header.Get("x-amz-request-id")
id2 := putResp.Header.Get("x-amz-id-2")
if reqID == "" && id2 == "" {
proxyHint := putResp.Header.Get("Server")
_, _ = io.Copy(io.Discard, putResp.Body)
_ = putResp.Body.Close()
return fmt.Errorf("S3 PUT returned 200 but the response is not from AWS (missing x-amz-request-id and x-amz-id-2; Server=%q) — the upload likely did not reach S3, possibly intercepted by a TLS-inspecting proxy or outbound firewall on this network", proxyHint)
}
log.Progress("Uploaded to S3 in %s", elapsed)
break
}
Expand Down Expand Up @@ -709,6 +725,20 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe
defer func() { _ = putResp.Body.Close() }()
_, _ = io.Copy(io.Discard, putResp.Body)

// Ask the backend to confirm the object actually exists in S3 before
// proceeding to notify. This catches the rare case where the PUT
// returned an AWS-headers-bearing 200 but the bytes still failed to
// persist, and gives the user a definitive line in the progress log
// independent of "backend processing started".
//
// The endpoint is best-effort from the agent's point of view: a
// definitive uploaded=false is fatal, but transport errors or unknown
// status codes (e.g. an old backend that doesn't expose this route)
// fall through to notify, whose own precheck is the safety net.
if err := confirmUploadInS3(ctx, log, client, urlResp.S3Key, payload.DeviceID); err != nil {
return err
}

// Notify backend
log.Progress("Notifying backend of upload...")
notifyBody, _ := json.Marshal(map[string]string{
Expand Down Expand Up @@ -744,6 +774,79 @@ func uploadToS3(ctx context.Context, log *progress.Logger, payload *Payload, exe
return nil
}

// confirmUploadInS3 calls the backend's /telemetry/confirm-upload endpoint
// to get a definitive uploaded/not-uploaded answer for the s3_key the
// agent was issued. Returns a non-nil error only when the backend
// confirms the object is missing (uploaded=false) — anything else
// (transport failure, non-200, parse error, old backend without the
// route) is logged and treated as non-fatal so the run still proceeds
// to notify, which has its own precheck as a safety net.
func confirmUploadInS3(ctx context.Context, log *progress.Logger, client *http.Client, s3Key, deviceID string) error {
log.Progress("Confirming upload reached S3...")

body, _ := json.Marshal(map[string]string{
"s3_key": s3Key,
"device_id": deviceID,
})
endpoint := fmt.Sprintf("%s/v1/%s/developer-mdm-agent/telemetry/confirm-upload",
config.APIEndpoint, config.CustomerID)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
log.Warn("confirm-upload: request build failed (%v); proceeding to notify", err)
return nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+config.APIKey)
req.Header.Set("X-Agent-Version", buildinfo.Version)

resp, err := client.Do(req)
if err != nil {
log.Warn("confirm-upload: request failed (%v); proceeding to notify", err)
return nil
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode == http.StatusNotFound {
// Backend predates the confirm-upload endpoint. Fine — notify's
// precheck still catches a missing object server-side.
log.Debug("confirm-upload: endpoint not available on this backend; falling back to notify-only flow")
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
if resp.StatusCode != http.StatusOK {
log.Warn("confirm-upload: HTTP %d; proceeding to notify", resp.StatusCode)
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}

var result struct {
Uploaded bool `json:"uploaded"`
SizeBytes int64 `json:"size_bytes"`
Reason string `json:"reason"`
S3Key string `json:"s3_key"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
log.Warn("confirm-upload: malformed response (%v); proceeding to notify", err)
return nil
}

if !result.Uploaded {
// Definitive negative answer from the backend — the object the
// agent thought it uploaded is not in S3. Stop here so the run
// is recorded as failed with a specific reason, rather than
// queueing a doomed worker job.
reason := result.Reason
if reason == "" {
reason = "unknown"
}
return fmt.Errorf("backend reports upload not in S3 (reason=%s, s3_key=%s) — the PUT returned success but no object exists; likely intercepted upstream", reason, s3Key)
}

log.Progress("Upload confirmed in S3 (%d bytes)", result.SizeBytes)
return nil
}

// gzipBytes returns a gzip-compressed copy of the input bytes.
func gzipBytes(data []byte) ([]byte, error) {
var buf bytes.Buffer
Expand Down
225 changes: 224 additions & 1 deletion internal/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/step-security/dev-machine-guard/internal/config"
Expand Down Expand Up @@ -50,17 +51,21 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) {
)

// Mock S3 PUT endpoint — captures the body the agent uploads.
// Emits x-amz-request-id so the client's "real AWS response" check
// (uploadToS3 in telemetry.go) treats this 200 as genuine.
s3Server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
mu.Lock()
putBody = body
putContentType = r.Header.Get("Content-Type")
mu.Unlock()
w.Header().Set("x-amz-request-id", "TESTREQID000000")
w.Header().Set("x-amz-id-2", "test-id-2")
w.WriteHeader(http.StatusOK)
}))
defer s3Server.Close()

// Mock backend — handles upload-URL and process-uploaded calls.
// Mock backend — handles upload-URL, confirm-upload, and process-uploaded.
backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"):
Expand All @@ -72,6 +77,12 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) {
"upload_url": s3Server.URL + "/put",
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
})
case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"):
_ = json.NewEncoder(w).Encode(map[string]any{
"uploaded": true,
"size_bytes": 4242,
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
})
case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"):
body, _ := io.ReadAll(r.Body)
mu.Lock()
Expand Down Expand Up @@ -156,3 +167,215 @@ func TestUploadToS3_SendsCompressedBodyAndIsCompressedFlag(t *testing.T) {
t.Errorf("expected execution_id=%q in notify body, got %q", testExecutionID, notify["execution_id"])
}
}

// TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders simulates a TLS-inspecting
// proxy / DLP appliance that terminates the agent's outbound PUT to S3 and
// returns a synthetic "200 OK" without forwarding the body. A real S3 response
// always carries x-amz-request-id and x-amz-id-2 — the agent must treat a 200
// missing both as an upload failure rather than silently calling notify with
// an s3_key whose object was never persisted.
func TestUploadToS3_RejectsSynthetic200WithoutAWSHeaders(t *testing.T) {
var notifyCalls atomic.Int32

// Mock "proxy" responding 200 with no AWS headers — what a corporate
// TLS interceptor produces when it terminates the PUT in-flight.
fakeProxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "fake-proxy/1.0")
w.WriteHeader(http.StatusOK)
}))
defer fakeProxy.Close()

backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"):
_ = json.NewEncoder(w).Encode(map[string]string{
"upload_url": fakeProxy.URL + "/put",
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
})
case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"):
notifyCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
http.NotFound(w, r)
}
}))
defer backendServer.Close()

origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey
config.APIEndpoint = backendServer.URL
config.CustomerID = "test-customer"
config.APIKey = "test-key"
defer func() {
config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey
}()

payload := &Payload{CustomerID: "test-customer", DeviceID: "dev-1"}
err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo), payload, "11111111-2222-4333-8444-555555555555")
if err == nil {
t.Fatalf("uploadToS3 should fail when the PUT response is missing AWS request id headers")
}
if !strings.Contains(err.Error(), "not from AWS") {
t.Errorf("expected error to mention 'not from AWS', got: %v", err)
}
if !strings.Contains(err.Error(), "fake-proxy/1.0") {
t.Errorf("expected error to include the Server header hint, got: %v", err)
}
if got := notifyCalls.Load(); got != 0 {
t.Errorf("notify endpoint must not be called when upload is rejected, got %d call(s)", got)
}
}

// newAWSHeaderS3Server returns an httptest server that responds 200 to PUTs
// with the AWS request id headers a real S3 sets, so the agent's response
// verification accepts it.
func newAWSHeaderS3Server(t *testing.T) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("x-amz-request-id", "TESTREQID000000")
w.Header().Set("x-amz-id-2", "test-id-2")
w.WriteHeader(http.StatusOK)
}))
}

func withTestConfig(t *testing.T, endpoint string) {
t.Helper()
origEndpoint, origCustomer, origKey := config.APIEndpoint, config.CustomerID, config.APIKey
config.APIEndpoint = endpoint
config.CustomerID = "test-customer"
config.APIKey = "test-key"
t.Cleanup(func() {
config.APIEndpoint, config.CustomerID, config.APIKey = origEndpoint, origCustomer, origKey
})
}

// TestUploadToS3_ConfirmUploadFalseIsFatal exercises the "backend confirms
// the object is not in S3" branch — the PUT response looked real (AWS
// headers present) but the backend HEAD says nothing landed. The agent
// must bail and not call notify, because notify would only re-discover
// the same fact moments later.
func TestUploadToS3_ConfirmUploadFalseIsFatal(t *testing.T) {
var notifyCalls atomic.Int32

s3Server := newAWSHeaderS3Server(t)
defer s3Server.Close()

backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"):
_ = json.NewEncoder(w).Encode(map[string]string{
"upload_url": s3Server.URL + "/put",
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
})
case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"):
_ = json.NewEncoder(w).Encode(map[string]any{
"uploaded": false,
"reason": "object_not_found",
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
})
case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"):
notifyCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
http.NotFound(w, r)
}
}))
defer backendServer.Close()
withTestConfig(t, backendServer.URL)

err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo),
&Payload{CustomerID: "test-customer", DeviceID: "dev-1"},
"11111111-2222-4333-8444-555555555555")
if err == nil {
t.Fatal("uploadToS3 must fail when backend confirms upload is not in S3")
}
if !strings.Contains(err.Error(), "not in S3") {
t.Errorf("expected error to mention 'not in S3', got: %v", err)
}
if !strings.Contains(err.Error(), "object_not_found") {
t.Errorf("expected error to include the backend reason, got: %v", err)
}
if got := notifyCalls.Load(); got != 0 {
t.Errorf("notify must not be called when confirm reports uploaded=false, got %d call(s)", got)
}
}

// TestUploadToS3_ConfirmUpload404FallsThroughToNotify covers compatibility
// with older backends that don't expose /telemetry/confirm-upload yet.
// A 404 on confirm must NOT fail the run — notify still gets called and
// the upload completes via the existing flow.
func TestUploadToS3_ConfirmUpload404FallsThroughToNotify(t *testing.T) {
var notifyCalls atomic.Int32

s3Server := newAWSHeaderS3Server(t)
defer s3Server.Close()

backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"):
_ = json.NewEncoder(w).Encode(map[string]string{
"upload_url": s3Server.URL + "/put",
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
})
case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"):
// Simulate an old backend that doesn't know this route.
http.NotFound(w, r)
case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"):
notifyCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
http.NotFound(w, r)
}
}))
defer backendServer.Close()
withTestConfig(t, backendServer.URL)

err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo),
&Payload{CustomerID: "test-customer", DeviceID: "dev-1"},
"11111111-2222-4333-8444-555555555555")
if err != nil {
t.Fatalf("uploadToS3 must succeed when confirm-upload is unsupported (404), got: %v", err)
}
if got := notifyCalls.Load(); got != 1 {
t.Errorf("notify must still be called when confirm-upload returns 404, got %d call(s)", got)
}
}

// TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify covers transient
// backend failure of the confirm endpoint. The agent must not fail the
// run for that — notify still has its own server-side precheck.
func TestUploadToS3_ConfirmUpload5xxFallsThroughToNotify(t *testing.T) {
var notifyCalls atomic.Int32

s3Server := newAWSHeaderS3Server(t)
defer s3Server.Close()

backendServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.URL.Path, "/telemetry/upload-url"):
_ = json.NewEncoder(w).Encode(map[string]string{
"upload_url": s3Server.URL + "/put",
"s3_key": "developer-mdm/test-customer/dev-1/123.json.gz",
})
case strings.HasSuffix(r.URL.Path, "/telemetry/confirm-upload"):
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte(`{"error":"s3_check_failed"}`))
case strings.HasSuffix(r.URL.Path, "/telemetry/process-uploaded"):
notifyCalls.Add(1)
w.WriteHeader(http.StatusOK)
default:
http.NotFound(w, r)
}
}))
defer backendServer.Close()
withTestConfig(t, backendServer.URL)

err := uploadToS3(context.Background(), progress.NewLogger(progress.LevelInfo),
&Payload{CustomerID: "test-customer", DeviceID: "dev-1"},
"11111111-2222-4333-8444-555555555555")
if err != nil {
t.Fatalf("uploadToS3 must succeed when confirm-upload returns 5xx, got: %v", err)
}
if got := notifyCalls.Load(); got != 1 {
t.Errorf("notify must still be called when confirm-upload returns 5xx, got %d call(s)", got)
}
}
Loading