From 9ae63bdda72fe7ebd3ca1ca590eb4b111c3d88c6 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 16:24:00 +0700 Subject: [PATCH 1/8] fix: skip event insert for retry attempts across all logstores Gate event insertion on AttemptNumber <= 1 in InsertMany for chlogstore, pglogstore, and memlogstore. Retry attempts carry identical event data, so re-inserting is wasteful (and creates duplicates in ClickHouse's ReplacingMergeTree before background merge). Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/logstore/chlogstore/chlogstore.go | 16 ++++++++++------ internal/logstore/memlogstore/memlogstore.go | 6 ++++-- internal/logstore/pglogstore/pglogstore.go | 8 ++++++-- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 74aa9284..164506ea 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -156,10 +156,9 @@ func buildEventQuery(table string, req driver.ListEventRequest, q pagination.Que orderByClause := fmt.Sprintf("ORDER BY event_time %s, event_id %s", strings.ToUpper(q.SortDir), strings.ToUpper(q.SortDir)) - // Note: We intentionally omit FINAL to avoid forcing ClickHouse to merge all parts - // before returning results. The events table uses ReplacingMergeTree, so duplicates - // may briefly appear before background merges consolidate them. This is acceptable - // for log viewing and maintains O(limit) query performance. + // We omit FINAL to avoid forcing ClickHouse to merge all parts at query time. + // Instead, LIMIT 1 BY event_id deduplicates in the result stream: after ORDER BY, + // it keeps only the first row per event_id, then the outer LIMIT caps the result set. query := fmt.Sprintf(` SELECT event_id, @@ -173,6 +172,7 @@ func buildEventQuery(table string, req driver.ListEventRequest, q pagination.Que FROM %s WHERE %s %s + LIMIT 1 BY event_id LIMIT %d `, table, whereClause, orderByClause, q.Limit) @@ -700,10 +700,14 @@ func (s *logStoreImpl) InsertMany(ctx context.Context, entries []*models.LogEntr return nil } - // Extract and dedupe events by ID + // Extract and dedupe events by ID, skipping retry attempts. + // Retries (AttemptNumber > 1) carry identical event data — the event row + // already exists from the first attempt's batch. eventMap := make(map[string]*models.Event) for _, entry := range entries { - eventMap[entry.Event.ID] = entry.Event + if entry.Attempt.AttemptNumber <= 1 { + eventMap[entry.Event.ID] = entry.Event + } } if len(eventMap) > 0 { diff --git a/internal/logstore/memlogstore/memlogstore.go b/internal/logstore/memlogstore/memlogstore.go index 6d8dc641..a6a191dd 100644 --- a/internal/logstore/memlogstore/memlogstore.go +++ b/internal/logstore/memlogstore/memlogstore.go @@ -196,8 +196,10 @@ func (s *memLogStore) InsertMany(ctx context.Context, entries []*models.LogEntry defer s.mu.Unlock() for _, entry := range entries { - // Insert event (dedupe by ID) - s.events[entry.Event.ID] = copyEvent(entry.Event) + // Insert event (dedupe by ID, skip retries) + if entry.Attempt.AttemptNumber <= 1 { + s.events[entry.Event.ID] = copyEvent(entry.Event) + } // Insert attempt (idempotent upsert: match on event_id + attempt_id) a := entry.Attempt diff --git a/internal/logstore/pglogstore/pglogstore.go b/internal/logstore/pglogstore/pglogstore.go index 75cb9512..1cd0585e 100644 --- a/internal/logstore/pglogstore/pglogstore.go +++ b/internal/logstore/pglogstore/pglogstore.go @@ -718,10 +718,14 @@ func (s *logStore) InsertMany(ctx context.Context, entries []*models.LogEntry) e return nil } - // Extract and dedupe events by ID + // Extract and dedupe events by ID, skipping retry attempts. + // Retries (AttemptNumber > 1) carry identical event data — the event row + // already exists from the first attempt's batch. eventMap := make(map[string]*models.Event) for _, entry := range entries { - eventMap[entry.Event.ID] = entry.Event + if entry.Attempt.AttemptNumber <= 1 { + eventMap[entry.Event.ID] = entry.Event + } } events := make([]*models.Event, 0, len(eventMap)) for _, e := range eventMap { From da68d90c648efdfef8971673de2050c56532056e Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 16:24:12 +0700 Subject: [PATCH 2/8] test: add TestEventDedup integration test for event deduplication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Comprehensive test covering fanout (1 event → 3 destinations), retries (3 attempts for same event), and legacy duplicate rows injected via raw batch inserts. Verifies both write-path row counts and read-path ListEvent uniqueness via LIMIT 1 BY event_id. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../logstore/chlogstore/chlogstore_test.go | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/internal/logstore/chlogstore/chlogstore_test.go b/internal/logstore/chlogstore/chlogstore_test.go index 827c35c1..8a918abb 100644 --- a/internal/logstore/chlogstore/chlogstore_test.go +++ b/internal/logstore/chlogstore/chlogstore_test.go @@ -3,13 +3,16 @@ package chlogstore import ( "context" "testing" + "time" "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/logstore/driver" "github.com/hookdeck/outpost/internal/logstore/drivertest" "github.com/hookdeck/outpost/internal/migrator" + "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -113,6 +116,152 @@ func newHarnessWithDeploymentID(ctx context.Context, t *testing.T) (drivertest.H }, nil } +// TestEventDedup verifies that duplicate event rows are prevented at the write +// path and deduplicated at the read path. The dataset models a realistic webhook +// delivery scenario: +// +// ── Dataset ────────────────────────────────────────────────────────────────── +// +// Event A ("order.created"): Fanout to 3 destinations in one batch. +// Batch 1: dest-a #1 success, dest-b #1 failed, dest-c #1 success +// Batch 2: dest-b #2 success (retry) +// +// Event B ("user.signup"): Single destination, fails twice then succeeds. +// Batch 3: dest-a #1 failed +// Batch 4: dest-a #2 failed (retry) +// Batch 5: dest-a #3 success (retry) +// +// Event C ("payment.received"): Single destination, succeeds first try. +// Batch 6: dest-b #1 success +// +// ── Expected ───────────────────────────────────────────────────────────────── +// Event rows: 3 (one per unique event, retries skipped by write path) +// Attempt rows: 8 (A×4 + B×3 + C×1, all persisted) +// ListEvent: 3 unique events (LIMIT 1 BY deduplicates any stragglers) +// +// After injecting legacy duplicates (raw batch inserts simulating pre-fix data): +// Event rows: 9 (3 original + 6 injected) +// ListEvent: still 3 (read-path dedup) +func TestEventDedup(t *testing.T) { + testutil.CheckIntegrationTest(t) + t.Parallel() + + ctx := context.Background() + chDB := setupClickHouseConnection(t) + defer chDB.Close() + + logStore := NewLogStore(chDB, "") + + tenantID := "dedup-tenant" + baseTime := time.Now().Truncate(time.Second) + + eventA := &models.Event{ + ID: "evt-a", TenantID: tenantID, + MatchedDestinationIDs: []string{"dest-a", "dest-b", "dest-c"}, + Topic: "order.created", EligibleForRetry: true, + Time: baseTime, Data: []byte(`{"order_id":"100"}`), + } + eventB := &models.Event{ + ID: "evt-b", TenantID: tenantID, + MatchedDestinationIDs: []string{"dest-a"}, + Topic: "user.signup", EligibleForRetry: true, + Time: baseTime.Add(-1 * time.Minute), Data: []byte(`{"user_id":"42"}`), + } + eventC := &models.Event{ + ID: "evt-c", TenantID: tenantID, + MatchedDestinationIDs: []string{"dest-b"}, + Topic: "payment.received", EligibleForRetry: false, + Time: baseTime.Add(-2 * time.Minute), Data: []byte(`{"amount":99}`), + } + + att := func(id, eventID, destID string, num int, status string, t time.Time) *models.Attempt { + return &models.Attempt{ + ID: id, TenantID: tenantID, EventID: eventID, + DestinationID: destID, AttemptNumber: num, Status: status, Time: t, + } + } + + // Batch 1: Event A fanout — 3 destinations + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventA, Attempt: att("att-a1", "evt-a", "dest-a", 1, "success", baseTime)}, + {Event: eventA, Attempt: att("att-a2", "evt-a", "dest-b", 1, "failed", baseTime)}, + {Event: eventA, Attempt: att("att-a3", "evt-a", "dest-c", 1, "success", baseTime)}, + })) + // Batch 2: Event A retry for dest-b + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventA, Attempt: att("att-a4", "evt-a", "dest-b", 2, "success", baseTime.Add(time.Second))}, + })) + // Batch 3-5: Event B — fails, retries, succeeds + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventB, Attempt: att("att-b1", "evt-b", "dest-a", 1, "failed", baseTime.Add(-1*time.Minute))}, + })) + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventB, Attempt: att("att-b2", "evt-b", "dest-a", 2, "failed", baseTime.Add(-1*time.Minute+time.Second))}, + })) + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventB, Attempt: att("att-b3", "evt-b", "dest-a", 3, "success", baseTime.Add(-1*time.Minute+2*time.Second))}, + })) + // Batch 6: Event C — single success + require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{ + {Event: eventC, Attempt: att("att-c1", "evt-c", "dest-b", 1, "success", baseTime.Add(-2*time.Minute))}, + })) + + // ── Write-path: raw row counts ────────────────────────────────────── + + var eventRows uint64 + row := chDB.QueryRow(ctx, "SELECT count() FROM events WHERE tenant_id = ?", tenantID) + require.NoError(t, row.Scan(&eventRows)) + assert.Equal(t, uint64(3), eventRows, "retries should not re-insert event rows") + + var attemptRows uint64 + row = chDB.QueryRow(ctx, "SELECT count() FROM attempts WHERE tenant_id = ?", tenantID) + require.NoError(t, row.Scan(&attemptRows)) + assert.Equal(t, uint64(8), attemptRows, "all attempts persisted") + + // ── Read-path: ListEvent deduplicates ──────────────────────────────── + + startTime := baseTime.Add(-10 * time.Minute) + resp, err := logStore.ListEvent(ctx, driver.ListEventRequest{ + TenantIDs: []string{tenantID}, + Limit: 100, + TimeFilter: driver.TimeFilter{GTE: &startTime}, + }) + require.NoError(t, err) + assert.Len(t, resp.Data, 3, "ListEvent returns 3 unique events") + + seen := map[string]bool{} + for _, evt := range resp.Data { + assert.False(t, seen[evt.ID], "duplicate in ListEvent: %s", evt.ID) + seen[evt.ID] = true + } + assert.True(t, seen["evt-a"]) + assert.True(t, seen["evt-b"]) + assert.True(t, seen["evt-c"]) + + // ── Read-path with legacy duplicates ───────────────────────────────── + // Inject duplicate event rows via raw batch inserts (simulates pre-fix data). + + for range 3 { + batch, batchErr := chDB.PrepareBatch(ctx, `INSERT INTO events (event_id, tenant_id, matched_destination_ids, topic, eligible_for_retry, event_time, metadata, data)`) + require.NoError(t, batchErr) + require.NoError(t, batch.Append("evt-a", tenantID, []string{"dest-a", "dest-b", "dest-c"}, "order.created", true, baseTime, "{}", `{"order_id":"100"}`)) + require.NoError(t, batch.Append("evt-b", tenantID, []string{"dest-a"}, "user.signup", true, baseTime.Add(-1*time.Minute), "{}", `{"user_id":"42"}`)) + require.NoError(t, batch.Send()) + } + + row = chDB.QueryRow(ctx, "SELECT count() FROM events WHERE tenant_id = ?", tenantID) + require.NoError(t, row.Scan(&eventRows)) + assert.Equal(t, uint64(9), eventRows, "3 original + 6 injected legacy duplicates") + + resp, err = logStore.ListEvent(ctx, driver.ListEventRequest{ + TenantIDs: []string{tenantID}, + Limit: 100, + TimeFilter: driver.TimeFilter{GTE: &startTime}, + }) + require.NoError(t, err) + assert.Len(t, resp.Data, 3, "LIMIT 1 BY deduplicates legacy rows") +} + func setupClickHouseConnectionWithDeploymentID(t *testing.T, deploymentID string) clickhouse.DB { t.Helper() t.Cleanup(testinfra.Start(t)) From 3696fab9a59e8fdfbd67812086c0189a6eb89965 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 16:24:20 +0700 Subject: [PATCH 3/8] fix: update metrics test dataset to use attempt_number=1 for all entries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each entry in the dataset is a unique event, not a retry — so attempt_number should be 1. The old cycling (i%4+1) was unrealistic and incompatible with the write-path fix that skips event inserts for AttemptNumber > 1. Updated derived totals and assertions: - first_attempt_count: 75 → 270 (300 - 30 manual) - retry_count: 225 → 0 - avg_attempt_number: 2.5 → 1.0 - by_attempt_number: 4 buckets × 75 → 1 bucket × 300 - filter_by_attempt_number=1: 75 → 300 Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/logstore/chlogstore/chlogstore.go | 3 +++ .../drivertest/metrics_data_correctness.go | 17 +++++++---------- internal/logstore/drivertest/metrics_dataset.go | 14 +++++--------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 164506ea..ce6d276a 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -383,6 +383,8 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination orderByClause := fmt.Sprintf("ORDER BY attempt_time %s, attempt_id %s", strings.ToUpper(q.SortDir), strings.ToUpper(q.SortDir)) + // LIMIT 1 BY attempt_id deduplicates rows that can appear when a message is + // nacked after a successful CH insert and then re-delivered. query := fmt.Sprintf(` SELECT event_id, @@ -404,6 +406,7 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination FROM %s WHERE %s %s + LIMIT 1 BY attempt_id LIMIT %d `, table, whereClause, orderByClause, q.Limit) diff --git a/internal/logstore/drivertest/metrics_data_correctness.go b/internal/logstore/drivertest/metrics_data_correctness.go index c7d3a349..c40de3d5 100644 --- a/internal/logstore/drivertest/metrics_data_correctness.go +++ b/internal/logstore/drivertest/metrics_data_correctness.go @@ -371,10 +371,10 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv require.NotNil(t, dp.RetryCount) require.NotNil(t, dp.ManualRetryCount) require.NotNil(t, dp.AvgAttemptNumber) - assert.Equal(t, 75, *dp.FirstAttemptCount) - assert.Equal(t, 225, *dp.RetryCount) + assert.Equal(t, 270, *dp.FirstAttemptCount) // 300 - 30 manual + assert.Equal(t, 0, *dp.RetryCount) assert.Equal(t, 30, *dp.ManualRetryCount) - assert.InDelta(t, 2.5, *dp.AvgAttemptNumber, 0.001) + assert.InDelta(t, 1.0, *dp.AvgAttemptNumber, 0.001) }) t.Run("rate no granularity", func(t *testing.T) { @@ -482,7 +482,7 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv Dimensions: []string{"attempt_number"}, }) require.NoError(t, err) - assert.Len(t, resp.Data, 4) + assert.Len(t, resp.Data, 1) ac := map[int]int{} for _, dp := range resp.Data { @@ -490,11 +490,8 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv require.NotNil(t, dp.Count) ac[*dp.AttemptNumber] = *dp.Count } - // attempt_number = i % 4 + 1 → each value appears 75 times - assert.Equal(t, 75, ac[1]) - assert.Equal(t, 75, ac[2]) - assert.Equal(t, 75, ac[3]) - assert.Equal(t, 75, ac[4]) + // All entries have attempt_number=1 + assert.Equal(t, 300, ac[1]) }) t.Run("by code", func(t *testing.T) { @@ -576,7 +573,7 @@ func testMetricsDataCorrectness(t *testing.T, ctx context.Context, logStore driv require.NoError(t, err) require.Len(t, resp.Data, 1) require.NotNil(t, resp.Data[0].Count) - assert.Equal(t, 75, *resp.Data[0].Count) + assert.Equal(t, 300, *resp.Data[0].Count) }) t.Run("granularity 1h on dense day", func(t *testing.T) { diff --git a/internal/logstore/drivertest/metrics_dataset.go b/internal/logstore/drivertest/metrics_dataset.go index 18efd521..0942037c 100644 --- a/internal/logstore/drivertest/metrics_dataset.go +++ b/internal/logstore/drivertest/metrics_dataset.go @@ -58,14 +58,10 @@ import ( // status: i % 5 → 0,1,2=success, 3,4=failed // code: success → i%2==0 ? "200" : "201" // failed → i%2==0 ? "500" : "422" -// attempt_number: i % 4 + 1 → 1,2,3,4 +// attempt_number: 1 (each entry is a unique event, not a retry) // manual: i % 10 == 9 // eligible_for_retry: i % 3 != 2 // -// FIXME: manual retries should always have attempt_number=1 (they start a new -// chain), but this dataset assigns them independently. Update the formula so -// manual=true implies attempt_number=1, then fix derived totals and assertions. -// // ── Derived Totals (Tenant 1, all 300) ─────────────────────────────────── // // Event metrics: @@ -83,10 +79,10 @@ import ( // successful_rate (no gran): 180/2678400 // failed_rate (no gran): 120/2678400 // by code: 200=90, 201=90, 500=60, 422=60 -// first_attempt (i%4+1==1): 75 -// retry (i%4+1>1): 225 +// first_attempt (attempt_number==1 AND !manual): 270 +// retry (attempt_number>1): 0 // manual (i%10==9): 30 -// avg_attempt_number: 750/300 = 2.5 +// avg_attempt_number: 1.0 // // Dense day — Jan 15 (250 events, indices 50..299): // hourly buckets: 10:00→25, 11:00→50, 12:00→100, 13:00→50, 14:00→25 @@ -179,7 +175,7 @@ func buildMetricsDataset() *metricsDataset { status = "failed" } code := codes[status][idx%2] - attemptNum := idx%4 + 1 + attemptNum := 1 // Each entry is a unique event, not a retry manual := idx%10 == 9 eligible := idx%3 != 2 From 21b48b351447a0c4cae2c42911522928f745e1f1 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 19:49:09 +0700 Subject: [PATCH 4/8] refactor: replace LIMIT 1 BY with client-side dedup loop Remove LIMIT 1 BY event_id/attempt_id from ListEvent/ListAttempt queries. Instead, deduplicate results in Go after fetching. If duplicates reduce the result count below the requested limit, the loop advances the cursor and fetches more rows. This avoids the performance penalty of LIMIT 1 BY (equivalent to GROUP BY) on large result sets while still hiding duplicates from unmerged ReplacingMergeTree parts. In practice, duplicates are rare after the write-path fix, so the loop almost never executes more than once. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/logstore/chlogstore/chlogstore.go | 91 +++++++++++++++++----- 1 file changed, 70 insertions(+), 21 deletions(-) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index ce6d276a..4547e962 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -43,6 +43,62 @@ func NewLogStore(chDB clickhouse.DB, deploymentID string) driver.LogStore { } } +// fetchAndDedup queries ClickHouse and deduplicates results by a key. If +// duplicates reduce the result count below the requested limit, it advances +// the cursor and fetches more rows until the limit is met or data is exhausted. +// This avoids LIMIT 1 BY / GROUP BY on large result sets while still hiding +// duplicates from unmerged ReplacingMergeTree parts. +func fetchAndDedup[T any]( + ctx context.Context, + chDB clickhouse.DB, + q pagination.QueryInput, + buildQuery func(pagination.QueryInput) (string, []any), + scan func(clickhouse.Rows) ([]T, error), + getID func(T) string, + getCursorPos func(T) string, +) ([]T, error) { + seen := make(map[string]bool) + var deduped []T + cursorPos := q.CursorPos + + for len(deduped) < q.Limit { + qi := pagination.QueryInput{ + Limit: q.Limit, + Compare: q.Compare, + SortDir: q.SortDir, + CursorPos: cursorPos, + } + query, args := buildQuery(qi) + rows, err := chDB.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query failed: %w", err) + } + scanned, err := scan(rows) + rows.Close() + if err != nil { + return nil, err + } + + for _, item := range scanned { + id := getID(item) + if !seen[id] { + seen[id] = true + deduped = append(deduped, item) + } + } + + // Fewer rows than requested means we've exhausted the data. + if len(scanned) < q.Limit { + break + } + + // Advance cursor past last scanned row for next iteration. + cursorPos = getCursorPos(scanned[len(scanned)-1]) + } + + return deduped, nil +} + // eventWithPosition wraps an event with its cursor position data. type eventWithPosition struct { *models.Event @@ -66,13 +122,13 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, req driver.ListEventReques Next: req.Next, Prev: req.Prev, Fetch: func(ctx context.Context, q pagination.QueryInput) ([]eventWithPosition, error) { - query, args := buildEventQuery(s.eventsTable, req, q) - rows, err := s.chDB.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("query failed: %w", err) - } - defer rows.Close() - return scanEvents(rows) + return fetchAndDedup(ctx, s.chDB, q, func(qi pagination.QueryInput) (string, []any) { + return buildEventQuery(s.eventsTable, req, qi) + }, scanEvents, func(e eventWithPosition) string { + return e.Event.ID + }, func(e eventWithPosition) string { + return fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) + }) }, Cursor: pagination.Cursor[eventWithPosition]{ Encode: func(e eventWithPosition) string { @@ -156,9 +212,6 @@ func buildEventQuery(table string, req driver.ListEventRequest, q pagination.Que orderByClause := fmt.Sprintf("ORDER BY event_time %s, event_id %s", strings.ToUpper(q.SortDir), strings.ToUpper(q.SortDir)) - // We omit FINAL to avoid forcing ClickHouse to merge all parts at query time. - // Instead, LIMIT 1 BY event_id deduplicates in the result stream: after ORDER BY, - // it keeps only the first row per event_id, then the outer LIMIT caps the result set. query := fmt.Sprintf(` SELECT event_id, @@ -172,7 +225,6 @@ func buildEventQuery(table string, req driver.ListEventRequest, q pagination.Que FROM %s WHERE %s %s - LIMIT 1 BY event_id LIMIT %d `, table, whereClause, orderByClause, q.Limit) @@ -283,13 +335,13 @@ func (s *logStoreImpl) ListAttempt(ctx context.Context, req driver.ListAttemptRe Next: req.Next, Prev: req.Prev, Fetch: func(ctx context.Context, q pagination.QueryInput) ([]attemptRecordWithPosition, error) { - query, args := buildAttemptQuery(s.attemptsTable, req, q) - rows, err := s.chDB.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("query failed: %w", err) - } - defer rows.Close() - return scanAttemptRecords(rows) + return fetchAndDedup(ctx, s.chDB, q, func(qi pagination.QueryInput) (string, []any) { + return buildAttemptQuery(s.attemptsTable, req, qi) + }, scanAttemptRecords, func(ar attemptRecordWithPosition) string { + return ar.Attempt.ID + }, func(ar attemptRecordWithPosition) string { + return fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID) + }) }, Cursor: pagination.Cursor[attemptRecordWithPosition]{ Encode: func(ar attemptRecordWithPosition) string { @@ -383,8 +435,6 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination orderByClause := fmt.Sprintf("ORDER BY attempt_time %s, attempt_id %s", strings.ToUpper(q.SortDir), strings.ToUpper(q.SortDir)) - // LIMIT 1 BY attempt_id deduplicates rows that can appear when a message is - // nacked after a successful CH insert and then re-delivered. query := fmt.Sprintf(` SELECT event_id, @@ -406,7 +456,6 @@ func buildAttemptQuery(table string, req driver.ListAttemptRequest, q pagination FROM %s WHERE %s %s - LIMIT 1 BY attempt_id LIMIT %d `, table, whereClause, orderByClause, q.Limit) From 022c803dc751506e869df18b368f1d957c3cd6f9 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 21:43:31 +0700 Subject: [PATCH 5/8] test: add TestFetchAndDedupTruncation for limit overshoot bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When duplicates cause the first batch to yield fewer unique items, the next batch appends all its unique items without checking the limit — returning more than requested. This test exposes the bug by inserting 1 duplicated event + 3 unique events and requesting limit=2 via fetchAndDedup directly (bypassing pagination.Run which masks the issue with its own truncation). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../logstore/chlogstore/chlogstore_test.go | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/internal/logstore/chlogstore/chlogstore_test.go b/internal/logstore/chlogstore/chlogstore_test.go index 8a918abb..65d0abec 100644 --- a/internal/logstore/chlogstore/chlogstore_test.go +++ b/internal/logstore/chlogstore/chlogstore_test.go @@ -2,12 +2,14 @@ package chlogstore import ( "context" + "fmt" "testing" "time" "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/logstore/driver" "github.com/hookdeck/outpost/internal/logstore/drivertest" + "github.com/hookdeck/outpost/internal/pagination" "github.com/hookdeck/outpost/internal/migrator" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testinfra" @@ -262,6 +264,62 @@ func TestEventDedup(t *testing.T) { assert.Len(t, resp.Data, 3, "LIMIT 1 BY deduplicates legacy rows") } +// TestFetchAndDedupTruncation verifies that fetchAndDedup never returns more +// items than the requested limit. When duplicates cause the first batch to +// yield fewer unique items, the next batch may overshoot the limit without +// truncation. +// +// Setup: 1 event duplicated 2× plus 3 unique events. With limit=2 and DESC: +// +// Batch 1: [A, A] → dedup → [A] (1 < 2, loop continues) +// Batch 2: [B, C] → dedup → [A, B, C] (3 items — exceeds limit) +func TestFetchAndDedupTruncation(t *testing.T) { + testutil.CheckIntegrationTest(t) + t.Parallel() + + ctx := context.Background() + chDB := setupClickHouseConnection(t) + defer chDB.Close() + + tenantID := "dedup-truncation" + baseTime := time.Now().Truncate(time.Second) + + insertRawEvent := func(id string, eventTime time.Time) { + batch, err := chDB.PrepareBatch(ctx, `INSERT INTO events (event_id, tenant_id, matched_destination_ids, topic, eligible_for_retry, event_time, metadata, data)`) + require.NoError(t, err) + require.NoError(t, batch.Append(id, tenantID, []string{"dest-1"}, "test.topic", true, eventTime, "{}", `{}`)) + require.NoError(t, batch.Send()) + } + + // Two rows for event A (same ID = duplicate in separate parts) + insertRawEvent("evt-trunc-a", baseTime) + insertRawEvent("evt-trunc-a", baseTime) + // Unique events at decreasing times + insertRawEvent("evt-trunc-b", baseTime.Add(-1*time.Second)) + insertRawEvent("evt-trunc-c", baseTime.Add(-2*time.Second)) + insertRawEvent("evt-trunc-d", baseTime.Add(-3*time.Second)) + + startTime := baseTime.Add(-10 * time.Minute) + limit := 2 + result, err := fetchAndDedup(ctx, chDB, pagination.QueryInput{ + Limit: limit, + Compare: "<", + SortDir: "desc", + }, func(qi pagination.QueryInput) (string, []any) { + return buildEventQuery("events", driver.ListEventRequest{ + TenantIDs: []string{tenantID}, + TimeFilter: driver.TimeFilter{GTE: &startTime}, + }, qi) + }, scanEvents, func(e eventWithPosition) string { + return e.Event.ID + }, func(e eventWithPosition) string { + return fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) + }) + require.NoError(t, err) + assert.LessOrEqual(t, len(result), limit, + "fetchAndDedup must not return more items than the requested limit") +} + func setupClickHouseConnectionWithDeploymentID(t *testing.T, deploymentID string) clickhouse.DB { t.Helper() t.Cleanup(testinfra.Start(t)) From b6aafd8febd0a9b70fdaab2a63c9a699dc224588 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 21:43:43 +0700 Subject: [PATCH 6/8] fix: truncate fetchAndDedup results to requested limit The dedup loop appends all unique items from each batch before rechecking the limit, so the last batch can overshoot. Truncate the result slice before returning. pagination.Run already truncates downstream, but fetchAndDedup should be correct on its own. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/logstore/chlogstore/chlogstore.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 4547e962..9444b687 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -96,6 +96,12 @@ func fetchAndDedup[T any]( cursorPos = getCursorPos(scanned[len(scanned)-1]) } + // Truncate to the requested limit — the last batch may have added + // more unique items than needed to reach the limit. + if len(deduped) > q.Limit { + deduped = deduped[:q.Limit] + } + return deduped, nil } From a24725530e91eb7bfe18e721b050714cf4fbe55c Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 21:44:48 +0700 Subject: [PATCH 7/8] refactor: extract cursorPosition methods, fix stale LIMIT 1 BY comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add cursorPosition() methods to eventWithPosition and attemptRecordWithPosition so the cursor format string lives in one place. Previously getCursorPos lambdas duplicated the format from Cursor.Encode — if one changed, the dedup loop would silently advance to invalid cursor positions. Also update test comments/messages that still referenced LIMIT 1 BY (removed in the previous client-side dedup commit). Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/logstore/chlogstore/chlogstore.go | 22 ++++++++++--------- .../logstore/chlogstore/chlogstore_test.go | 9 +++----- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 9444b687..9d4b4d86 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -111,6 +111,10 @@ type eventWithPosition struct { eventTime time.Time } +func (e eventWithPosition) cursorPosition() string { + return fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) +} + func (s *logStoreImpl) ListEvent(ctx context.Context, req driver.ListEventRequest) (driver.ListEventResponse, error) { sortOrder := req.SortOrder if sortOrder != "asc" && sortOrder != "desc" { @@ -132,14 +136,11 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, req driver.ListEventReques return buildEventQuery(s.eventsTable, req, qi) }, scanEvents, func(e eventWithPosition) string { return e.Event.ID - }, func(e eventWithPosition) string { - return fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) - }) + }, eventWithPosition.cursorPosition) }, Cursor: pagination.Cursor[eventWithPosition]{ Encode: func(e eventWithPosition) string { - position := fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) - return cursor.Encode(cursorResourceEvent, cursorVersion, position) + return cursor.Encode(cursorResourceEvent, cursorVersion, e.cursorPosition()) }, Decode: func(c string) (string, error) { return cursor.Decode(c, cursorResourceEvent, cursorVersion) @@ -324,6 +325,10 @@ type attemptRecordWithPosition struct { attemptTime time.Time } +func (ar attemptRecordWithPosition) cursorPosition() string { + return fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID) +} + func (s *logStoreImpl) ListAttempt(ctx context.Context, req driver.ListAttemptRequest) (driver.ListAttemptResponse, error) { sortOrder := req.SortOrder if sortOrder != "asc" && sortOrder != "desc" { @@ -345,14 +350,11 @@ func (s *logStoreImpl) ListAttempt(ctx context.Context, req driver.ListAttemptRe return buildAttemptQuery(s.attemptsTable, req, qi) }, scanAttemptRecords, func(ar attemptRecordWithPosition) string { return ar.Attempt.ID - }, func(ar attemptRecordWithPosition) string { - return fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID) - }) + }, attemptRecordWithPosition.cursorPosition) }, Cursor: pagination.Cursor[attemptRecordWithPosition]{ Encode: func(ar attemptRecordWithPosition) string { - position := fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID) - return cursor.Encode(cursorResourceAttempt, cursorVersion, position) + return cursor.Encode(cursorResourceAttempt, cursorVersion, ar.cursorPosition()) }, Decode: func(c string) (string, error) { return cursor.Decode(c, cursorResourceAttempt, cursorVersion) diff --git a/internal/logstore/chlogstore/chlogstore_test.go b/internal/logstore/chlogstore/chlogstore_test.go index 65d0abec..d50a8931 100644 --- a/internal/logstore/chlogstore/chlogstore_test.go +++ b/internal/logstore/chlogstore/chlogstore_test.go @@ -2,7 +2,6 @@ package chlogstore import ( "context" - "fmt" "testing" "time" @@ -139,7 +138,7 @@ func newHarnessWithDeploymentID(ctx context.Context, t *testing.T) (drivertest.H // ── Expected ───────────────────────────────────────────────────────────────── // Event rows: 3 (one per unique event, retries skipped by write path) // Attempt rows: 8 (A×4 + B×3 + C×1, all persisted) -// ListEvent: 3 unique events (LIMIT 1 BY deduplicates any stragglers) +// ListEvent: 3 unique events (client-side dedup hides any stragglers) // // After injecting legacy duplicates (raw batch inserts simulating pre-fix data): // Event rows: 9 (3 original + 6 injected) @@ -261,7 +260,7 @@ func TestEventDedup(t *testing.T) { TimeFilter: driver.TimeFilter{GTE: &startTime}, }) require.NoError(t, err) - assert.Len(t, resp.Data, 3, "LIMIT 1 BY deduplicates legacy rows") + assert.Len(t, resp.Data, 3, "client-side dedup hides legacy rows") } // TestFetchAndDedupTruncation verifies that fetchAndDedup never returns more @@ -312,9 +311,7 @@ func TestFetchAndDedupTruncation(t *testing.T) { }, qi) }, scanEvents, func(e eventWithPosition) string { return e.Event.ID - }, func(e eventWithPosition) string { - return fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID) - }) + }, eventWithPosition.cursorPosition) require.NoError(t, err) assert.LessOrEqual(t, len(result), limit, "fetchAndDedup must not return more items than the requested limit") From da49bca6cee6f84fa3829d74c0152e2f98d14326 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 17 Apr 2026 21:45:16 +0700 Subject: [PATCH 8/8] fix: cap fetchAndDedup loop to 10 iterations Prevents runaway queries against pathological data with extreme duplication. In normal operation the loop executes once; with the write-path fix, duplicates only arise from unmerged ReplacingMergeTree parts. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/logstore/chlogstore/chlogstore.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 9d4b4d86..ce5fbf83 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -43,6 +43,12 @@ func NewLogStore(chDB clickhouse.DB, deploymentID string) driver.LogStore { } } +// maxDedupIterations caps the number of fetch rounds in fetchAndDedup. +// In practice the loop almost never exceeds 1 iteration (duplicates are rare +// after the write-path fix), but this prevents runaway queries against +// pathological data with extreme duplication. +const maxDedupIterations = 10 + // fetchAndDedup queries ClickHouse and deduplicates results by a key. If // duplicates reduce the result count below the requested limit, it advances // the cursor and fetches more rows until the limit is met or data is exhausted. @@ -61,7 +67,7 @@ func fetchAndDedup[T any]( var deduped []T cursorPos := q.CursorPos - for len(deduped) < q.Limit { + for iter := 0; len(deduped) < q.Limit && iter < maxDedupIterations; iter++ { qi := pagination.QueryInput{ Limit: q.Limit, Compare: q.Compare,