diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 74aa9284..ce5fbf83 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -43,12 +43,84 @@ func NewLogStore(chDB clickhouse.DB, deploymentID string) driver.LogStore { } } +// maxDedupIterations caps the number of fetch rounds in fetchAndDedup. +// In practice the loop almost never exceeds 1 iteration (duplicates are rare +// after the write-path fix), but this prevents runaway queries against +// pathological data with extreme duplication. +const maxDedupIterations = 10 + +// fetchAndDedup queries ClickHouse and deduplicates results by a key. If +// duplicates reduce the result count below the requested limit, it advances +// the cursor and fetches more rows until the limit is met or data is exhausted. +// This avoids LIMIT 1 BY / GROUP BY on large result sets while still hiding +// duplicates from unmerged ReplacingMergeTree parts. +func fetchAndDedup[T any]( + ctx context.Context, + chDB clickhouse.DB, + q pagination.QueryInput, + buildQuery func(pagination.QueryInput) (string, []any), + scan func(clickhouse.Rows) ([]T, error), + getID func(T) string, + getCursorPos func(T) string, +) ([]T, error) { + seen := make(map[string]bool) + var deduped []T + cursorPos := q.CursorPos + + for iter := 0; len(deduped) < q.Limit && iter < maxDedupIterations; iter++ { + qi := pagination.QueryInput{ + Limit: q.Limit, + Compare: q.Compare, + SortDir: q.SortDir, + CursorPos: cursorPos, + } + query, args := buildQuery(qi) + rows, err := chDB.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query failed: %w", err) + } + scanned, err := scan(rows) + rows.Close() + if err != nil { + return nil, err + } + + for _, item := range scanned { + id := getID(item) + if !seen[id] { + seen[id] = true + deduped = append(deduped, item) + } + } + + // Fewer rows than requested means we've exhausted the data. + if len(scanned) < q.Limit { + break + } + + // Advance cursor past last scanned row for next iteration. + cursorPos = getCursorPos(scanned[len(scanned)-1]) + } + + // Truncate to the requested limit — the last batch may have added + // more unique items than needed to reach the limit. + if len(deduped) > q.Limit { + deduped = deduped[:q.Limit] + } + + return deduped, nil +} + // eventWithPosition wraps an event with its cursor position data. type eventWithPosition struct { *models.Event eventTime time.Time } +func (e eventWithPosition) cursorPosition() string { + return fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) +} + func (s *logStoreImpl) ListEvent(ctx context.Context, req driver.ListEventRequest) (driver.ListEventResponse, error) { sortOrder := req.SortOrder if sortOrder != "asc" && sortOrder != "desc" { @@ -66,18 +138,15 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, req driver.ListEventReques Next: req.Next, Prev: req.Prev, Fetch: func(ctx context.Context, q pagination.QueryInput) ([]eventWithPosition, error) { - query, args := buildEventQuery(s.eventsTable, req, q) - rows, err := s.chDB.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("query failed: %w", err) - } - defer rows.Close() - return scanEvents(rows) + return fetchAndDedup(ctx, s.chDB, q, func(qi pagination.QueryInput) (string, []any) { + return buildEventQuery(s.eventsTable, req, qi) + }, scanEvents, func(e eventWithPosition) string { + return e.Event.ID + }, eventWithPosition.cursorPosition) }, Cursor: pagination.Cursor[eventWithPosition]{ Encode: func(e eventWithPosition) string { - position := fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) - return cursor.Encode(cursorResourceEvent, cursorVersion, position) + return cursor.Encode(cursorResourceEvent, cursorVersion, e.cursorPosition()) }, Decode: func(c string) (string, error) { return cursor.Decode(c, cursorResourceEvent, cursorVersion) @@ -156,10 +225,6 @@ func buildEventQuery(table string, req driver.ListEventRequest, q pagination.Que orderByClause := fmt.Sprintf("ORDER BY event_time %s, event_id %s", strings.ToUpper(q.SortDir), strings.ToUpper(q.SortDir)) - // Note: We intentionally omit FINAL to avoid forcing ClickHouse to merge all parts - // before returning results. The events table uses ReplacingMergeTree, so duplicates - // may briefly appear before background merges consolidate them. This is acceptable - // for log viewing and maintains O(limit) query performance. query := fmt.Sprintf(` SELECT event_id, @@ -266,6 +331,10 @@ type attemptRecordWithPosition struct { attemptTime time.Time } +func (ar attemptRecordWithPosition) cursorPosition() string { + return fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID) +} + func (s *logStoreImpl) ListAttempt(ctx context.Context, req driver.ListAttemptRequest) (driver.ListAttemptResponse, error) { sortOrder := req.SortOrder if sortOrder != "asc" && sortOrder != "desc" { @@ -283,18 +352,15 @@ func (s *logStoreImpl) ListAttempt(ctx context.Context, req driver.ListAttemptRe Next: req.Next, Prev: req.Prev, Fetch: func(ctx context.Context, q pagination.QueryInput) ([]attemptRecordWithPosition, error) { - query, args := buildAttemptQuery(s.attemptsTable, req, q) - rows, err := s.chDB.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("query failed: %w", err) - } - defer rows.Close() - return scanAttemptRecords(rows) + return fetchAndDedup(ctx, s.chDB, q, func(qi pagination.QueryInput) (string, []any) { + return buildAttemptQuery(s.attemptsTable, req, qi) + }, scanAttemptRecords, func(ar attemptRecordWithPosition) string { + return ar.Attempt.ID + }, attemptRecordWithPosition.cursorPosition) }, Cursor: pagination.Cursor[attemptRecordWithPosition]{ Encode: func(ar attemptRecordWithPosition) string { - position := fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID) - return cursor.Encode(cursorResourceAttempt, cursorVersion, position) + return cursor.Encode(cursorResourceAttempt, cursorVersion, ar.cursorPosition()) }, Decode: func(c string) (string, error) { return cursor.Decode(c, cursorResourceAttempt, cursorVersion) @@ -700,10 +766,14 @@ func (s *logStoreImpl) InsertMany(ctx context.Context, entries []*models.LogEntr return nil } - // Extract and dedupe events by ID + // Extract and dedupe events by ID, skipping retry attempts. + // Retries (AttemptNumber > 1) carry identical event data — the event row + // already exists from the first attempt's batch. eventMap := make(map[string]*models.Event) for _, entry := range entries { - eventMap[entry.Event.ID] = entry.Event + if entry.Attempt.AttemptNumber <= 1 { + eventMap[entry.Event.ID] = entry.Event + } } if len(eventMap) > 0 { diff --git a/internal/logstore/chlogstore/chlogstore_test.go b/internal/logstore/chlogstore/chlogstore_test.go index 827c35c1..d50a8931 100644 --- a/internal/logstore/chlogstore/chlogstore_test.go +++ b/internal/logstore/chlogstore/chlogstore_test.go @@ -3,13 +3,17 @@ package chlogstore import ( "context" "testing" + "time" "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/logstore/driver" "github.com/hookdeck/outpost/internal/logstore/drivertest" + "github.com/hookdeck/outpost/internal/pagination" "github.com/hookdeck/outpost/internal/migrator" + "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -113,6 +117,206 @@ func newHarnessWithDeploymentID(ctx context.Context, t *testing.T) (drivertest.H }, nil } +// TestEventDedup verifies that duplicate event rows are prevented at the write +// path and deduplicated at the read path. The dataset models a realistic webhook +// delivery scenario: +// +// ── Dataset ────────────────────────────────────────────────────────────────── +// +// Event A ("order.created"): Fanout to 3 destinations in one batch. +// Batch 1: dest-a #1 success, dest-b #1 failed, dest-c #1 success +// Batch 2: dest-b #2 success (retry) +// +// Event B ("user.signup"): Single destination, fails twice then succeeds. +// Batch 3: dest-a #1 failed +// Batch 4: dest-a #2 failed (retry) +// Batch 5: dest-a #3 success (retry) +// +// Event C ("payment.received"): Single destination, succeeds first try. +// Batch 6: dest-b #1 success +// +// ── Expected ───────────────────────────────────────────────────────────────── +// Event rows: 3 (one per unique event, retries skipped by write path) +// Attempt rows: 8 (A×4 + B×3 + C×1, all persisted) +// ListEvent: 3 unique events (client-side dedup hides any stragglers) +// +// After injecting legacy duplicates (raw batch inserts simulating pre-fix data): +// Event rows: 9 (3 original + 6 injected) +// ListEvent: still 3 (read-path dedup) +func TestEventDedup(t *testing.T) { + testutil.CheckIntegrationTest(t) + t.Parallel() + + ctx := context.Background() + chDB := setupClickHouseConnection(t) + defer chDB.Close() + + logStore := NewLogStore(chDB, "") + + tenantID := "dedup-tenant" + baseTime := time.Now().Truncate(time.Second) + + eventA := &models.Event{ + ID: "evt-a", TenantID: tenantID, + MatchedDestinationIDs: []string{"dest-a", "dest-b", "dest-c"}, + Topic: "order.created", EligibleForRetry: true, + Time: baseTime, Data: []byte(`{"order_id":"100"}`), + } + eventB := &models.Event{ + ID: "evt-b", TenantID: tenantID, + MatchedDestinationIDs: []string{"dest-a"}, + Topic: "user.signup", EligibleForRetry: true, + Time: baseTime.Add(-1 * time.Minute), Data: []byte(`{"user_id":"42"}`), + } + eventC := &models.Event{ + ID: "evt-c", TenantID: tenantID, + MatchedDestinationIDs: []string{"dest-b"}, + Topic: "payment.received", EligibleForRetry: false, + Time: baseTime.Add(-2 * time.Minute), Data: []byte(`{"amount":99}`), + } + + att := func(id, eventID, destID string, num int, status string, t time.Time) *models.Attempt { + return &models.Attempt{ + ID: id, TenantID: tenantID, EventID: eventID, + DestinationID: destID, AttemptNumber: num, Status: status, Time: t, + } + } + + // Batch 1: Event A fanout — 3 destinations + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventA, Attempt: att("att-a1", "evt-a", "dest-a", 1, "success", baseTime)}, + {Event: eventA, Attempt: att("att-a2", "evt-a", "dest-b", 1, "failed", baseTime)}, + {Event: eventA, Attempt: att("att-a3", "evt-a", "dest-c", 1, "success", baseTime)}, + })) + // Batch 2: Event A retry for dest-b + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventA, Attempt: att("att-a4", "evt-a", "dest-b", 2, "success", baseTime.Add(time.Second))}, + })) + // Batch 3-5: Event B — fails, retries, succeeds + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventB, Attempt: att("att-b1", "evt-b", "dest-a", 1, "failed", baseTime.Add(-1*time.Minute))}, + })) + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventB, Attempt: att("att-b2", "evt-b", "dest-a", 2, "failed", baseTime.Add(-1*time.Minute+time.Second))}, + })) + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventB, Attempt: att("att-b3", "evt-b", "dest-a", 3, "success", baseTime.Add(-1*time.Minute+2*time.Second))}, + })) + // Batch 6: Event C — single success + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventC, Attempt: att("att-c1", "evt-c", "dest-b", 1, "success", baseTime.Add(-2*time.Minute))}, + })) + + // ── Write-path: raw row counts ────────────────────────────────────── + + var eventRows uint64 + row := chDB.QueryRow(ctx, "SELECT count() FROM events WHERE tenant_id = ?", tenantID) + require.NoError(t, row.Scan(&eventRows)) + assert.Equal(t, uint64(3), eventRows, "retries should not re-insert event rows") + + var attemptRows uint64 + row = chDB.QueryRow(ctx, "SELECT count() FROM attempts WHERE tenant_id = ?", tenantID) + require.NoError(t, row.Scan(&attemptRows)) + assert.Equal(t, uint64(8), attemptRows, "all attempts persisted") + + // ── Read-path: ListEvent deduplicates ──────────────────────────────── + + startTime := baseTime.Add(-10 * time.Minute) + resp, err := logStore.ListEvent(ctx, driver.ListEventRequest{ + TenantIDs: []string{tenantID}, + Limit: 100, + TimeFilter: driver.TimeFilter{GTE: &startTime}, + }) + require.NoError(t, err) + assert.Len(t, resp.Data, 3, "ListEvent returns 3 unique events") + + seen := map[string]bool{} + for _, evt := range resp.Data { + assert.False(t, seen[evt.ID], "duplicate in ListEvent: %s", evt.ID) + seen[evt.ID] = true + } + assert.True(t, seen["evt-a"]) + assert.True(t, seen["evt-b"]) + assert.True(t, seen["evt-c"]) + + // ── Read-path with legacy duplicates ───────────────────────────────── + // Inject duplicate event rows via raw batch inserts (simulates pre-fix data). + + for range 3 { + batch, batchErr := chDB.PrepareBatch(ctx, `INSERT INTO events (event_id, tenant_id, matched_destination_ids, topic, eligible_for_retry, event_time, metadata, data)`) + require.NoError(t, batchErr) + require.NoError(t, batch.Append("evt-a", tenantID, []string{"dest-a", "dest-b", "dest-c"}, "order.created", true, baseTime, "{}", `{"order_id":"100"}`)) + require.NoError(t, batch.Append("evt-b", tenantID, []string{"dest-a"}, "user.signup", true, baseTime.Add(-1*time.Minute), "{}", `{"user_id":"42"}`)) + require.NoError(t, batch.Send()) + } + + row = chDB.QueryRow(ctx, "SELECT count() FROM events WHERE tenant_id = ?", tenantID) + require.NoError(t, row.Scan(&eventRows)) + assert.Equal(t, uint64(9), eventRows, "3 original + 6 injected legacy duplicates") + + resp, err = logStore.ListEvent(ctx, driver.ListEventRequest{ + TenantIDs: []string{tenantID}, + Limit: 100, + TimeFilter: driver.TimeFilter{GTE: &startTime}, + }) + require.NoError(t, err) + assert.Len(t, resp.Data, 3, "client-side dedup hides legacy rows") +} + +// TestFetchAndDedupTruncation verifies that fetchAndDedup never returns more +// items than the requested limit. When duplicates cause the first batch to +// yield fewer unique items, the next batch may overshoot the limit without +// truncation. +// +// Setup: 1 event duplicated 2× plus 3 unique events. With limit=2 and DESC: +// +// Batch 1: [A, A] → dedup → [A] (1 < 2, loop continues) +// Batch 2: [B, C] → dedup → [A, B, C] (3 items — exceeds limit) +func TestFetchAndDedupTruncation(t *testing.T) { + testutil.CheckIntegrationTest(t) + t.Parallel() + + ctx := context.Background() + chDB := setupClickHouseConnection(t) + defer chDB.Close() + + tenantID := "dedup-truncation" + baseTime := time.Now().Truncate(time.Second) + + insertRawEvent := func(id string, eventTime time.Time) { + batch, err := chDB.PrepareBatch(ctx, `INSERT INTO events (event_id, tenant_id, matched_destination_ids, topic, eligible_for_retry, event_time, metadata, data)`) + require.NoError(t, err) + require.NoError(t, batch.Append(id, tenantID, []string{"dest-1"}, "test.topic", true, eventTime, "{}", `{}`)) + require.NoError(t, batch.Send()) + } + + // Two rows for event A (same ID = duplicate in separate parts) + insertRawEvent("evt-trunc-a", baseTime) + insertRawEvent("evt-trunc-a", baseTime) + // Unique events at decreasing times + insertRawEvent("evt-trunc-b", baseTime.Add(-1*time.Second)) + insertRawEvent("evt-trunc-c", baseTime.Add(-2*time.Second)) + insertRawEvent("evt-trunc-d", baseTime.Add(-3*time.Second)) + + startTime := baseTime.Add(-10 * time.Minute) + limit := 2 + result, err := fetchAndDedup(ctx, chDB, pagination.QueryInput{ + Limit: limit, + Compare: "<", + SortDir: "desc", + }, func(qi pagination.QueryInput) (string, []any) { + return buildEventQuery("events", driver.ListEventRequest{ + TenantIDs: []string{tenantID}, + TimeFilter: driver.TimeFilter{GTE: &startTime}, + }, qi) + }, scanEvents, func(e eventWithPosition) string { + return e.Event.ID + }, eventWithPosition.cursorPosition) + require.NoError(t, err) + assert.LessOrEqual(t, len(result), limit, + "fetchAndDedup must not return more items than the requested limit") +} + func setupClickHouseConnectionWithDeploymentID(t *testing.T, deploymentID string) clickhouse.DB { t.Helper() t.Cleanup(testinfra.Start(t)) diff --git a/internal/logstore/drivertest/metrics_data_correctness.go b/internal/logstore/drivertest/metrics_data_correctness.go index c7d3a349..c40de3d5 100644 --- a/internal/logstore/drivertest/metrics_data_correctness.go +++ b/internal/logstore/drivertest/metrics_data_correctness.go @@ -371,10 +371,10 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv require.NotNil(t, dp.RetryCount) require.NotNil(t, dp.ManualRetryCount) require.NotNil(t, dp.AvgAttemptNumber) - assert.Equal(t, 75, *dp.FirstAttemptCount) - assert.Equal(t, 225, *dp.RetryCount) + assert.Equal(t, 270, *dp.FirstAttemptCount) // 300 - 30 manual + assert.Equal(t, 0, *dp.RetryCount) assert.Equal(t, 30, *dp.ManualRetryCount) - assert.InDelta(t, 2.5, *dp.AvgAttemptNumber, 0.001) + assert.InDelta(t, 1.0, *dp.AvgAttemptNumber, 0.001) }) t.Run("rate no granularity", func(t *testing.T) { @@ -482,7 +482,7 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv Dimensions: []string{"attempt_number"}, }) require.NoError(t, err) - assert.Len(t, resp.Data, 4) + assert.Len(t, resp.Data, 1) ac := map[int]int{} for _, dp := range resp.Data { @@ -490,11 +490,8 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv require.NotNil(t, dp.Count) ac[*dp.AttemptNumber] = *dp.Count } - // attempt_number = i % 4 + 1 → each value appears 75 times - assert.Equal(t, 75, ac[1]) - assert.Equal(t, 75, ac[2]) - assert.Equal(t, 75, ac[3]) - assert.Equal(t, 75, ac[4]) + // All entries have attempt_number=1 + assert.Equal(t, 300, ac[1]) }) t.Run("by code", func(t *testing.T) { @@ -576,7 +573,7 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv require.NoError(t, err) require.Len(t, resp.Data, 1) require.NotNil(t, resp.Data[0].Count) - assert.Equal(t, 75, *resp.Data[0].Count) + assert.Equal(t, 300, *resp.Data[0].Count) }) t.Run("granularity 1h on dense day", func(t *testing.T) { diff --git a/internal/logstore/drivertest/metrics_dataset.go b/internal/logstore/drivertest/metrics_dataset.go index 18efd521..0942037c 100644 --- a/internal/logstore/drivertest/metrics_dataset.go +++ b/internal/logstore/drivertest/metrics_dataset.go @@ -58,14 +58,10 @@ import ( // status: i % 5 → 0,1,2=success, 3,4=failed // code: success → i%2==0 ? "200" : "201" // failed → i%2==0 ? "500" : "422" -// attempt_number: i % 4 + 1 → 1,2,3,4 +// attempt_number: 1 (each entry is a unique event, not a retry) // manual: i % 10 == 9 // eligible_for_retry: i % 3 != 2 // -// FIXME: manual retries should always have attempt_number=1 (they start a new -// chain), but this dataset assigns them independently. Update the formula so -// manual=true implies attempt_number=1, then fix derived totals and assertions. -// // ── Derived Totals (Tenant 1, all 300) ─────────────────────────────────── // // Event metrics: @@ -83,10 +79,10 @@ import ( // successful_rate (no gran): 180/2678400 // failed_rate (no gran): 120/2678400 // by code: 200=90, 201=90, 500=60, 422=60 -// first_attempt (i%4+1==1): 75 -// retry (i%4+1>1): 225 +// first_attempt (attempt_number==1 AND !manual): 270 +// retry (attempt_number>1): 0 // manual (i%10==9): 30 -// avg_attempt_number: 750/300 = 2.5 +// avg_attempt_number: 1.0 // // Dense day — Jan 15 (250 events, indices 50..299): // hourly buckets: 10:00→25, 11:00→50, 12:00→100, 13:00→50, 14:00→25 @@ -179,7 +175,7 @@ func buildMetricsDataset() *metricsDataset { status = "failed" } code := codes[status][idx%2] - attemptNum := idx%4 + 1 + attemptNum := 1 // Each entry is a unique event, not a retry manual := idx%10 == 9 eligible := idx%3 != 2 diff --git a/internal/logstore/memlogstore/memlogstore.go b/internal/logstore/memlogstore/memlogstore.go index 6d8dc641..a6a191dd 100644 --- a/internal/logstore/memlogstore/memlogstore.go +++ b/internal/logstore/memlogstore/memlogstore.go @@ -196,8 +196,10 @@ func (s *memLogStore) InsertMany(ctx context.Context, entries []*models.LogEntry defer s.mu.Unlock() for _, entry := range entries { - // Insert event (dedupe by ID) - s.events[entry.Event.ID] = copyEvent(entry.Event) + // Insert event (dedupe by ID, skip retries) + if entry.Attempt.AttemptNumber <= 1 { + s.events[entry.Event.ID] = copyEvent(entry.Event) + } // Insert attempt (idempotent upsert: match on event_id + attempt_id) a := entry.Attempt diff --git a/internal/logstore/pglogstore/pglogstore.go b/internal/logstore/pglogstore/pglogstore.go index 75cb9512..1cd0585e 100644 --- a/internal/logstore/pglogstore/pglogstore.go +++ b/internal/logstore/pglogstore/pglogstore.go @@ -718,10 +718,14 @@ func (s *logStore) InsertMany(ctx context.Context, entries []*models.LogEntry) e return nil } - // Extract and dedupe events by ID + // Extract and dedupe events by ID, skipping retry attempts. + // Retries (AttemptNumber > 1) carry identical event data — the event row + // already exists from the first attempt's batch. eventMap := make(map[string]*models.Event) for _, entry := range entries { - eventMap[entry.Event.ID] = entry.Event + if entry.Attempt.AttemptNumber <= 1 { + eventMap[entry.Event.ID] = entry.Event + } } events := make([]*models.Event, 0, len(eventMap)) for _, e := range eventMap {