Skip to content
118 changes: 94 additions & 24 deletions internal/logstore/chlogstore/chlogstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,84 @@ func NewLogStore(chDB clickhouse.DB, deploymentID string) driver.LogStore {
}
}

// maxDedupIterations caps the number of fetch rounds in fetchAndDedup.
// In practice the loop almost never exceeds 1 iteration (duplicates are rare
// after the write-path fix), but this prevents runaway queries against
// pathological data with extreme duplication.
const maxDedupIterations = 10

// fetchAndDedup queries ClickHouse and deduplicates results by a key. If
// duplicates reduce the result count below the requested limit, it advances
// the cursor and fetches more rows until the limit is met or data is exhausted.
// This avoids LIMIT 1 BY / GROUP BY on large result sets while still hiding
// duplicates from unmerged ReplacingMergeTree parts.
func fetchAndDedup[T any](
ctx context.Context,
chDB clickhouse.DB,
q pagination.QueryInput,
buildQuery func(pagination.QueryInput) (string, []any),
scan func(clickhouse.Rows) ([]T, error),
getID func(T) string,
getCursorPos func(T) string,
) ([]T, error) {
seen := make(map[string]bool)
var deduped []T
cursorPos := q.CursorPos

for iter := 0; len(deduped) < q.Limit && iter < maxDedupIterations; iter++ {
qi := pagination.QueryInput{
Limit: q.Limit,
Compare: q.Compare,
SortDir: q.SortDir,
CursorPos: cursorPos,
}
query, args := buildQuery(qi)
rows, err := chDB.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
scanned, err := scan(rows)
rows.Close()
if err != nil {
return nil, err
}

for _, item := range scanned {
id := getID(item)
if !seen[id] {
seen[id] = true
deduped = append(deduped, item)
}
}

// Fewer rows than requested means we've exhausted the data.
if len(scanned) < q.Limit {
break
}

// Advance cursor past last scanned row for next iteration.
cursorPos = getCursorPos(scanned[len(scanned)-1])
}

// Truncate to the requested limit — the last batch may have added
// more unique items than needed to reach the limit.
if len(deduped) > q.Limit {
deduped = deduped[:q.Limit]
}

return deduped, nil
}

// eventWithPosition wraps an event with its cursor position data.
type eventWithPosition struct {
*models.Event
eventTime time.Time
}

func (e eventWithPosition) cursorPosition() string {
return fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID)
}

func (s *logStoreImpl) ListEvent(ctx context.Context, req driver.ListEventRequest) (driver.ListEventResponse, error) {
sortOrder := req.SortOrder
if sortOrder != "asc" && sortOrder != "desc" {
Expand All @@ -66,18 +138,15 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, req driver.ListEventReques
Next: req.Next,
Prev: req.Prev,
Fetch: func(ctx context.Context, q pagination.QueryInput) ([]eventWithPosition, error) {
query, args := buildEventQuery(s.eventsTable, req, q)
rows, err := s.chDB.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
defer rows.Close()
return scanEvents(rows)
return fetchAndDedup(ctx, s.chDB, q, func(qi pagination.QueryInput) (string, []any) {
return buildEventQuery(s.eventsTable, req, qi)
}, scanEvents, func(e eventWithPosition) string {
return e.Event.ID
}, eventWithPosition.cursorPosition)
},
Cursor: pagination.Cursor[eventWithPosition]{
Encode: func(e eventWithPosition) string {
position := fmt.Sprintf("%d::%s", e.eventTime.UnixMilli(), e.Event.ID)
return cursor.Encode(cursorResourceEvent, cursorVersion, position)
return cursor.Encode(cursorResourceEvent, cursorVersion, e.cursorPosition())
},
Decode: func(c string) (string, error) {
return cursor.Decode(c, cursorResourceEvent, cursorVersion)
Expand Down Expand Up @@ -156,10 +225,6 @@ func buildEventQuery(table string, req driver.ListEventRequest, q pagination.Que
orderByClause := fmt.Sprintf("ORDER BY event_time %s, event_id %s",
strings.ToUpper(q.SortDir), strings.ToUpper(q.SortDir))

// Note: We intentionally omit FINAL to avoid forcing ClickHouse to merge all parts
// before returning results. The events table uses ReplacingMergeTree, so duplicates
// may briefly appear before background merges consolidate them. This is acceptable
// for log viewing and maintains O(limit) query performance.
query := fmt.Sprintf(`
SELECT
event_id,
Expand Down Expand Up @@ -266,6 +331,10 @@ type attemptRecordWithPosition struct {
attemptTime time.Time
}

func (ar attemptRecordWithPosition) cursorPosition() string {
return fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID)
}

func (s *logStoreImpl) ListAttempt(ctx context.Context, req driver.ListAttemptRequest) (driver.ListAttemptResponse, error) {
sortOrder := req.SortOrder
if sortOrder != "asc" && sortOrder != "desc" {
Expand All @@ -283,18 +352,15 @@ func (s *logStoreImpl) ListAttempt(ctx context.Context, req driver.ListAttemptRe
Next: req.Next,
Prev: req.Prev,
Fetch: func(ctx context.Context, q pagination.QueryInput) ([]attemptRecordWithPosition, error) {
query, args := buildAttemptQuery(s.attemptsTable, req, q)
rows, err := s.chDB.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
defer rows.Close()
return scanAttemptRecords(rows)
return fetchAndDedup(ctx, s.chDB, q, func(qi pagination.QueryInput) (string, []any) {
return buildAttemptQuery(s.attemptsTable, req, qi)
}, scanAttemptRecords, func(ar attemptRecordWithPosition) string {
return ar.Attempt.ID
}, attemptRecordWithPosition.cursorPosition)
},
Cursor: pagination.Cursor[attemptRecordWithPosition]{
Encode: func(ar attemptRecordWithPosition) string {
position := fmt.Sprintf("%d::%s", ar.attemptTime.UnixMilli(), ar.Attempt.ID)
return cursor.Encode(cursorResourceAttempt, cursorVersion, position)
return cursor.Encode(cursorResourceAttempt, cursorVersion, ar.cursorPosition())
},
Decode: func(c string) (string, error) {
return cursor.Decode(c, cursorResourceAttempt, cursorVersion)
Expand Down Expand Up @@ -700,10 +766,14 @@ func (s *logStoreImpl) InsertMany(ctx context.Context, entries []*models.LogEntr
return nil
}

// Extract and dedupe events by ID
// Extract and dedupe events by ID, skipping retry attempts.
// Retries (AttemptNumber > 1) carry identical event data — the event row
// already exists from the first attempt's batch.
eventMap := make(map[string]*models.Event)
for _, entry := range entries {
eventMap[entry.Event.ID] = entry.Event
if entry.Attempt.AttemptNumber <= 1 {
eventMap[entry.Event.ID] = entry.Event
}
}

if len(eventMap) > 0 {
Expand Down
204 changes: 204 additions & 0 deletions internal/logstore/chlogstore/chlogstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package chlogstore
import (
"context"
"testing"
"time"

"github.com/hookdeck/outpost/internal/clickhouse"
"github.com/hookdeck/outpost/internal/logstore/driver"
"github.com/hookdeck/outpost/internal/logstore/drivertest"
"github.com/hookdeck/outpost/internal/pagination"
"github.com/hookdeck/outpost/internal/migrator"
"github.com/hookdeck/outpost/internal/models"
"github.com/hookdeck/outpost/internal/util/testinfra"
"github.com/hookdeck/outpost/internal/util/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -113,6 +117,206 @@ func newHarnessWithDeploymentID(ctx context.Context, t *testing.T) (drivertest.H
}, nil
}

// TestEventDedup verifies that duplicate event rows are prevented at the write
// path and deduplicated at the read path. The dataset models a realistic webhook
// delivery scenario:
//
// ── Dataset ──────────────────────────────────────────────────────────────────
//
// Event A ("order.created"): Fanout to 3 destinations in one batch.
// Batch 1: dest-a #1 success, dest-b #1 failed, dest-c #1 success
// Batch 2: dest-b #2 success (retry)
//
// Event B ("user.signup"): Single destination, fails twice then succeeds.
// Batch 3: dest-a #1 failed
// Batch 4: dest-a #2 failed (retry)
// Batch 5: dest-a #3 success (retry)
//
// Event C ("payment.received"): Single destination, succeeds first try.
// Batch 6: dest-b #1 success
//
// ── Expected ─────────────────────────────────────────────────────────────────
// Event rows: 3 (one per unique event, retries skipped by write path)
// Attempt rows: 8 (A×4 + B×3 + C×1, all persisted)
// ListEvent: 3 unique events (client-side dedup hides any stragglers)
//
// After injecting legacy duplicates (raw batch inserts simulating pre-fix data):
// Event rows: 9 (3 original + 6 injected)
// ListEvent: still 3 (read-path dedup)
func TestEventDedup(t *testing.T) {
testutil.CheckIntegrationTest(t)
t.Parallel()

ctx := context.Background()
chDB := setupClickHouseConnection(t)
defer chDB.Close()

logStore := NewLogStore(chDB, "")

tenantID := "dedup-tenant"
baseTime := time.Now().Truncate(time.Second)

eventA := &models.Event{
ID: "evt-a", TenantID: tenantID,
MatchedDestinationIDs: []string{"dest-a", "dest-b", "dest-c"},
Topic: "order.created", EligibleForRetry: true,
Time: baseTime, Data: []byte(`{"order_id":"100"}`),
}
eventB := &models.Event{
ID: "evt-b", TenantID: tenantID,
MatchedDestinationIDs: []string{"dest-a"},
Topic: "user.signup", EligibleForRetry: true,
Time: baseTime.Add(-1 * time.Minute), Data: []byte(`{"user_id":"42"}`),
}
eventC := &models.Event{
ID: "evt-c", TenantID: tenantID,
MatchedDestinationIDs: []string{"dest-b"},
Topic: "payment.received", EligibleForRetry: false,
Time: baseTime.Add(-2 * time.Minute), Data: []byte(`{"amount":99}`),
}

att := func(id, eventID, destID string, num int, status string, t time.Time) *models.Attempt {
return &models.Attempt{
ID: id, TenantID: tenantID, EventID: eventID,
DestinationID: destID, AttemptNumber: num, Status: status, Time: t,
}
}

// Batch 1: Event A fanout — 3 destinations
require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{
{Event: eventA, Attempt: att("att-a1", "evt-a", "dest-a", 1, "success", baseTime)},
{Event: eventA, Attempt: att("att-a2", "evt-a", "dest-b", 1, "failed", baseTime)},
{Event: eventA, Attempt: att("att-a3", "evt-a", "dest-c", 1, "success", baseTime)},
}))
// Batch 2: Event A retry for dest-b
require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{
{Event: eventA, Attempt: att("att-a4", "evt-a", "dest-b", 2, "success", baseTime.Add(time.Second))},
}))
// Batch 3-5: Event B — fails, retries, succeeds
require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{
{Event: eventB, Attempt: att("att-b1", "evt-b", "dest-a", 1, "failed", baseTime.Add(-1*time.Minute))},
}))
require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{
{Event: eventB, Attempt: att("att-b2", "evt-b", "dest-a", 2, "failed", baseTime.Add(-1*time.Minute+time.Second))},
}))
require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{
{Event: eventB, Attempt: att("att-b3", "evt-b", "dest-a", 3, "success", baseTime.Add(-1*time.Minute+2*time.Second))},
}))
// Batch 6: Event C — single success
require.NoError(t, logStore.InsertMany(ctx, []*models.LogEntry{
{Event: eventC, Attempt: att("att-c1", "evt-c", "dest-b", 1, "success", baseTime.Add(-2*time.Minute))},
}))

// ── Write-path: raw row counts ──────────────────────────────────────

var eventRows uint64
row := chDB.QueryRow(ctx, "SELECT count() FROM events WHERE tenant_id = ?", tenantID)
require.NoError(t, row.Scan(&eventRows))
assert.Equal(t, uint64(3), eventRows, "retries should not re-insert event rows")

var attemptRows uint64
row = chDB.QueryRow(ctx, "SELECT count() FROM attempts WHERE tenant_id = ?", tenantID)
require.NoError(t, row.Scan(&attemptRows))
assert.Equal(t, uint64(8), attemptRows, "all attempts persisted")

// ── Read-path: ListEvent deduplicates ────────────────────────────────

startTime := baseTime.Add(-10 * time.Minute)
resp, err := logStore.ListEvent(ctx, driver.ListEventRequest{
TenantIDs: []string{tenantID},
Limit: 100,
TimeFilter: driver.TimeFilter{GTE: &startTime},
})
require.NoError(t, err)
assert.Len(t, resp.Data, 3, "ListEvent returns 3 unique events")

seen := map[string]bool{}
for _, evt := range resp.Data {
assert.False(t, seen[evt.ID], "duplicate in ListEvent: %s", evt.ID)
seen[evt.ID] = true
}
assert.True(t, seen["evt-a"])
assert.True(t, seen["evt-b"])
assert.True(t, seen["evt-c"])

// ── Read-path with legacy duplicates ─────────────────────────────────
// Inject duplicate event rows via raw batch inserts (simulates pre-fix data).

for range 3 {
batch, batchErr := chDB.PrepareBatch(ctx, `INSERT INTO events (event_id, tenant_id, matched_destination_ids, topic, eligible_for_retry, event_time, metadata, data)`)
require.NoError(t, batchErr)
require.NoError(t, batch.Append("evt-a", tenantID, []string{"dest-a", "dest-b", "dest-c"}, "order.created", true, baseTime, "{}", `{"order_id":"100"}`))
require.NoError(t, batch.Append("evt-b", tenantID, []string{"dest-a"}, "user.signup", true, baseTime.Add(-1*time.Minute), "{}", `{"user_id":"42"}`))
require.NoError(t, batch.Send())
}

row = chDB.QueryRow(ctx, "SELECT count() FROM events WHERE tenant_id = ?", tenantID)
require.NoError(t, row.Scan(&eventRows))
assert.Equal(t, uint64(9), eventRows, "3 original + 6 injected legacy duplicates")

resp, err = logStore.ListEvent(ctx, driver.ListEventRequest{
TenantIDs: []string{tenantID},
Limit: 100,
TimeFilter: driver.TimeFilter{GTE: &startTime},
})
require.NoError(t, err)
assert.Len(t, resp.Data, 3, "client-side dedup hides legacy rows")
}

// TestFetchAndDedupTruncation verifies that fetchAndDedup never returns more
// items than the requested limit. When duplicates cause the first batch to
// yield fewer unique items, the next batch may overshoot the limit without
// truncation.
//
// Setup: 1 event duplicated 2× plus 3 unique events. With limit=2 and DESC:
//
// Batch 1: [A, A] → dedup → [A] (1 < 2, loop continues)
// Batch 2: [B, C] → dedup → [A, B, C] (3 items — exceeds limit)
func TestFetchAndDedupTruncation(t *testing.T) {
testutil.CheckIntegrationTest(t)
t.Parallel()

ctx := context.Background()
chDB := setupClickHouseConnection(t)
defer chDB.Close()

tenantID := "dedup-truncation"
baseTime := time.Now().Truncate(time.Second)

insertRawEvent := func(id string, eventTime time.Time) {
batch, err := chDB.PrepareBatch(ctx, `INSERT INTO events (event_id, tenant_id, matched_destination_ids, topic, eligible_for_retry, event_time, metadata, data)`)
require.NoError(t, err)
require.NoError(t, batch.Append(id, tenantID, []string{"dest-1"}, "test.topic", true, eventTime, "{}", `{}`))
require.NoError(t, batch.Send())
}

// Two rows for event A (same ID = duplicate in separate parts)
insertRawEvent("evt-trunc-a", baseTime)
insertRawEvent("evt-trunc-a", baseTime)
// Unique events at decreasing times
insertRawEvent("evt-trunc-b", baseTime.Add(-1*time.Second))
insertRawEvent("evt-trunc-c", baseTime.Add(-2*time.Second))
insertRawEvent("evt-trunc-d", baseTime.Add(-3*time.Second))

startTime := baseTime.Add(-10 * time.Minute)
limit := 2
result, err := fetchAndDedup(ctx, chDB, pagination.QueryInput{
Limit: limit,
Compare: "<",
SortDir: "desc",
}, func(qi pagination.QueryInput) (string, []any) {
return buildEventQuery("events", driver.ListEventRequest{
TenantIDs: []string{tenantID},
TimeFilter: driver.TimeFilter{GTE: &startTime},
}, qi)
}, scanEvents, func(e eventWithPosition) string {
return e.Event.ID
}, eventWithPosition.cursorPosition)
require.NoError(t, err)
assert.LessOrEqual(t, len(result), limit,
"fetchAndDedup must not return more items than the requested limit")
}

func setupClickHouseConnectionWithDeploymentID(t *testing.T, deploymentID string) clickhouse.DB {
t.Helper()
t.Cleanup(testinfra.Start(t))
Expand Down
Loading
Loading