From eb7b98fcdecbe4fc662ba07fe8a0c1bbf4115c85 Mon Sep 17 00:00:00 2001 From: Scott Gress Date: Tue, 5 May 2026 06:29:47 -0700 Subject: [PATCH] Optimize data collection: add index and batch deletes (#44692) **Related issue:** Resolves #44609 # Details This PR optimizes the historical data collection system in two ways: 1. Adds an additional index on the `host_scd_data` table allowing more efficient lookups of rows by their `valid_to`, to optimize both closing out open rows and deleting old rows 2. Implements batching in the job that deletes old rows, so that it no longer blocks writes if the collection job happens to happen at the same time as the cleanup job # Checklist for submitter If some of the following don't apply, delete the relevant line. - [ ] Changes file added for user-visible changes in `changes/`, `orbit/changes/` or `ee/fleetd-chrome/changes`. See [Changes files](https://github.com/fleetdm/fleet/blob/main/docs/Contributing/guides/committing-changes.md#changes-files) for more information. n/a, unreleased - [X] Input data is properly validated, `SELECT *` is avoided, SQL injection is prevented (using placeholders for values in statements), JS inline code is prevented especially for url redirects, and untrusted data interpolated into shell scripts/commands is validated against shell metacharacters. - [ ] Timeouts are implemented and retries are limited to avoid infinite loops ## Testing - [ ] Added/updated automated tests - [X] Where appropriate, [automated tests simulate multiple hosts and test for host isolation](https://github.com/fleetdm/fleet/blob/main/docs/Contributing/reference/patterns-backend.md#unit-testing) (updates to one hosts's records do not affect another) - [X] QA'd all new/changed functionality manually SQL explains -- before: ``` +----+-------------+---------------+------------+------+---------------+------+---------+------+--------+----------+-------------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra | +----+-------------+---------------+------------+------+---------------+------+---------+------+--------+----------+-------------+ | 1 | DELETE | host_scd_data | NULL | ALL | NULL | NULL | NULL | NULL | 144320 | 100.00 | Using where | +----+-------------+---------------+------------+------+---------------+------+---------+------+--------+----------+-------------+ +----+-------------+---------------+------------+-------+--------------------------------------+--------------------+---------+-------------+------+----------+-------------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra | +----+-------------+---------------+------------+-------+--------------------------------------+--------------------+---------+-------------+------+----------+-------------+ | 1 | UPDATE | host_scd_data | NULL | range | uniq_entity_bucket,idx_dataset_range | uniq_entity_bucket | 604 | const,const | 3030 | 100.00 | Using where | +----+-------------+---------------+------------+-------+--------------------------------------+--------------------+---------+-------------+------+----------+-------------+ ``` Using a test set of data (~144k "open" rows), UPDATES happened at 9 ops per second. after: ``` +----+-------------+---------------+------------+-------+----------------------+----------------------+---------+-------+-------+----------+-------------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra | +----+-------------+---------------+------------+-------+----------------------+----------------------+---------+-------+-------+----------+-------------+ | 1 | DELETE | host_scd_data | NULL | range | idx_valid_to_dataset | idx_valid_to_dataset | 5 | const | 55749 | 100.00 | Using where | +----+-------------+---------------+------------+-------+----------------------+----------------------+---------+-------+-------+----------+-------------+ +----+-------------+---------------+------------+-------+-----------------------------------------------------------+----------------------+---------+-------------------+------+----------+------------------------------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra | +----+-------------+---------------+------------+-------+-----------------------------------------------------------+----------------------+---------+-------------------+------+----------+------------------------------+ | 1 | UPDATE | host_scd_data | NULL | range | uniq_entity_bucket,idx_dataset_range,idx_valid_to_dataset | idx_valid_to_dataset | 609 | const,const,const | 4 | 100.00 | Using where; Using temporary | +----+-------------+---------------+------------+-------+-----------------------------------------------------------+----------------------+---------+-------------------+------+----------+------------------------------+ ``` Using the same test set of data, UPDATES happened at 4,910 ops per second. For unreleased bug fixes in a release candidate, one of: - [X] Confirmed that the fix is not expected to adversely impact load test results this should significantly improve results! - [ ] Alerted the release DRI if additional load testing is needed ## Database migrations - [X] Checked schema for all modified table for columns that will auto-update timestamps during migration. - [ ] Confirmed that updating the timestamps is acceptable, and will not cause unwanted side effects. - [ ] Ensured the correct collation is explicitly set for character columns (`COLLATE utf8mb4_unicode_ci`). ## Summary by CodeRabbit * **Chores** * Cleanup now runs in controlled, ordered batches, removing only closed/historical records while respecting cancellation; error reporting for cleanup was strengthened. * Added a new composite index on historical data to improve cleanup and query performance. * **Tests** * Added tests and test helpers validating batched cleanup behavior, preservation of open records, multi-batch operation, and cancellation handling. --- server/chart/arch_test.go | 9 +- server/chart/internal/mysql/data.go | 36 ++++++-- server/chart/internal/mysql/data_test.go | 82 +++++++++++++++++++ server/chart/internal/testutils/testutils.go | 76 +++++++++++++++++ .../tables/20260423161823_AddHostSCDData.go | 3 +- server/datastore/mysql/schema.sql | 3 +- 6 files changed, 196 insertions(+), 13 deletions(-) create mode 100644 server/chart/internal/testutils/testutils.go diff --git a/server/chart/arch_test.go b/server/chart/arch_test.go index 17d5d8f9803..9a2a640b073 100644 --- a/server/chart/arch_test.go +++ b/server/chart/arch_test.go @@ -61,9 +61,11 @@ func TestChartPackageDependencies(t *testing.T) { ignoreDeps: []string{m + "/server/chart/api"}, }, { - name: "internal/mysql depends on chart, types, and platform", - pkg: m + "/server/chart/internal/mysql", - ignoreDeps: slices.Concat(chartPkgs, platformPkgs), + name: "internal/mysql depends on chart, types, and platform", + pkg: m + "/server/chart/internal/mysql", + ignoreDeps: slices.Concat(chartPkgs, platformPkgs, []string{ + m + "/server/chart/internal/testutils", + }), }, { name: "internal/service depends on chart and platform packages", @@ -84,6 +86,7 @@ func TestChartPackageDependencies(t *testing.T) { ignoreDeps: slices.Concat([]string{ m + "/server/chart/internal/mysql", m + "/server/chart/internal/service", + m + "/server/chart/internal/testutils", }, chartPkgs, platformPkgs), }, } diff --git a/server/chart/internal/mysql/data.go b/server/chart/internal/mysql/data.go index 82f21852824..ede0f16b6d3 100644 --- a/server/chart/internal/mysql/data.go +++ b/server/chart/internal/mysql/data.go @@ -20,6 +20,11 @@ var scdOpenSentinel = time.Date(9999, 12, 31, 0, 0, 0, 0, time.UTC) // scdUpsertBatch caps how many entity rows are written per INSERT statement. const scdUpsertBatch = 200 +// scdCleanupBatch caps how many rows CleanupSCDData deletes per statement, so +// each batch's lock window is short and concurrent writers can interleave. +// var (not const) so tests can shrink it to exercise multi-batch behavior. +var scdCleanupBatch = 1000 + // scdRow is a single row of host_scd_data as fetched by GetSCDData. type scdRow struct { EntityID string `db:"entity_id"` @@ -367,17 +372,32 @@ func aggregateBucket(rows []scdRow, bucketStart, bucketEnd time.Time, strategy a // CleanupSCDData deletes closed SCD rows whose valid_to is older than the // retention cutoff. Open rows (valid_to = sentinel) are always preserved. +// Deletes in batches so each statement holds locks briefly and the concurrent +// collection cron can interleave writes. func (ds *Datastore) CleanupSCDData(ctx context.Context, days int) error { // Compute the cutoff in Go (UTC) so the retention boundary doesn't depend // on the MySQL session time zone — all valid_to writes are UTC. cutoff := time.Now().UTC().AddDate(0, 0, -days) - _, err := ds.writer(ctx).ExecContext(ctx, - `DELETE FROM host_scd_data - WHERE valid_to < ? - AND valid_to <> ?`, - cutoff, scdOpenSentinel) - if err != nil { - return ctxerr.Wrap(ctx, err, "cleanup SCD data") + for { + if err := ctx.Err(); err != nil { + return ctxerr.Wrap(ctx, err, "cleanup SCD data") + } + res, err := ds.writer(ctx).ExecContext(ctx, + `DELETE FROM host_scd_data + WHERE valid_to < ? + AND valid_to <> ? + ORDER BY valid_to + LIMIT ?`, + cutoff, scdOpenSentinel, scdCleanupBatch) + if err != nil { + return ctxerr.Wrap(ctx, err, "cleanup SCD data") + } + n, err := res.RowsAffected() + if err != nil { + return ctxerr.Wrap(ctx, err, "cleanup SCD data rows affected") + } + if n < int64(scdCleanupBatch) { + return nil + } } - return nil } diff --git a/server/chart/internal/mysql/data_test.go b/server/chart/internal/mysql/data_test.go index 6328b92202d..4d16809d745 100644 --- a/server/chart/internal/mysql/data_test.go +++ b/server/chart/internal/mysql/data_test.go @@ -1,12 +1,16 @@ package mysql import ( + "context" + "fmt" "testing" "time" "github.com/fleetdm/fleet/v4/server/chart" "github.com/fleetdm/fleet/v4/server/chart/api" + "github.com/fleetdm/fleet/v4/server/chart/internal/testutils" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAggregateBucketAccumulate(t *testing.T) { @@ -107,3 +111,81 @@ func TestAggregateBucketSnapshotRowClosedExactlyAtBucketEnd(t *testing.T) { got := aggregateBucket(rows, bucketStart, bucketEnd, api.SampleStrategySnapshot) assert.Equal(t, 2, chart.BlobPopcount(got), "row whose valid_to equals bucketEnd covers bucketEnd-ε") } + +func TestCleanupSCDData(t *testing.T) { + tdb := testutils.SetupTestDB(t, "chart_mysql") + ds := NewDatastore(tdb.Conns(), tdb.Logger) + + cases := []struct { + name string + fn func(t *testing.T, tdb *testutils.TestDB, ds *Datastore) + }{ + {"PreservesOpenAndRecent", testCleanupPreservesOpenAndRecent}, + {"MultipleBatches", testCleanupMultipleBatches}, + {"HonorsCtxCancellation", testCleanupHonorsCtxCancellation}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + defer tdb.TruncateTables(t) + c.fn(t, tdb, ds) + }) + } +} + +func testCleanupPreservesOpenAndRecent(t *testing.T, tdb *testutils.TestDB, ds *Datastore) { + ctx := t.Context() + now := time.Now().UTC() + + // Old closed row — should be deleted (valid_to is 40 days ago, retention 30). + tdb.InsertSCDRow(t, "cve", "old", now.AddDate(0, 0, -45), now.AddDate(0, 0, -40)) + // Recent closed row — within retention window, should be preserved. + tdb.InsertSCDRow(t, "cve", "recent", now.AddDate(0, 0, -10), now.AddDate(0, 0, -5)) + // Open row (sentinel valid_to) — must always be preserved. + tdb.InsertSCDRow(t, "cve", "open", now.AddDate(0, 0, -45), scdOpenSentinel) + + require.NoError(t, ds.CleanupSCDData(ctx, 30)) + + assert.Equal(t, 2, tdb.CountSCDRows(t), "only the old closed row should be deleted") + + var entities []string + require.NoError(t, tdb.DB.SelectContext(ctx, &entities, `SELECT entity_id FROM host_scd_data ORDER BY entity_id`)) + assert.Equal(t, []string{"open", "recent"}, entities) +} + +func testCleanupMultipleBatches(t *testing.T, tdb *testutils.TestDB, ds *Datastore) { + ctx := t.Context() + now := time.Now().UTC() + + // Shrink batch size so we can prove the loop iterates without inserting + // thousands of rows. + prev := scdCleanupBatch + scdCleanupBatch = 3 + t.Cleanup(func() { scdCleanupBatch = prev }) + + // Insert 10 expired closed rows — that's 4 iterations at batch size 3 + // (3 + 3 + 3 + 1, where the final partial batch terminates the loop). + for i := range 10 { + validFrom := now.AddDate(0, 0, -45).Add(time.Duration(i) * time.Minute) + validTo := now.AddDate(0, 0, -40).Add(time.Duration(i) * time.Minute) + tdb.InsertSCDRow(t, "cve", fmt.Sprintf("e%d", i), validFrom, validTo) + } + + require.NoError(t, ds.CleanupSCDData(ctx, 30)) + + assert.Equal(t, 0, tdb.CountSCDRows(t), "all expired rows should be drained across batches") +} + +func testCleanupHonorsCtxCancellation(t *testing.T, tdb *testutils.TestDB, ds *Datastore) { + now := time.Now().UTC() + + // Insert a single expired row so a non-canceled call would have something + // to delete — confirms that nothing was removed because of cancellation. + tdb.InsertSCDRow(t, "cve", "old", now.AddDate(0, 0, -45), now.AddDate(0, 0, -40)) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + err := ds.CleanupSCDData(ctx, 30) + require.ErrorIs(t, err, context.Canceled) + assert.Equal(t, 1, tdb.CountSCDRows(t), "no rows should be deleted when ctx was canceled before the first batch") +} diff --git a/server/chart/internal/testutils/testutils.go b/server/chart/internal/testutils/testutils.go new file mode 100644 index 00000000000..51378edd197 --- /dev/null +++ b/server/chart/internal/testutils/testutils.go @@ -0,0 +1,76 @@ +// Package testutils provides shared test utilities for the chart bounded context. +package testutils + +import ( + "log/slog" + "testing" + "time" + + common_mysql "github.com/fleetdm/fleet/v4/server/platform/mysql" + mysql_testing_utils "github.com/fleetdm/fleet/v4/server/platform/mysql/testing_utils" + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/require" +) + +// TestDB holds the database connection for tests. +type TestDB struct { + DB *sqlx.DB + Logger *slog.Logger +} + +// SetupTestDB creates a test database with the Fleet schema loaded. Tests are +// skipped automatically when MYSQL_TEST is not set. +func SetupTestDB(t *testing.T, testNamePrefix string) *TestDB { + t.Helper() + + testName, opts := mysql_testing_utils.ProcessOptions(t, &mysql_testing_utils.DatastoreTestOptions{ + UniqueTestName: testNamePrefix + "_" + t.Name(), + }) + + mysql_testing_utils.LoadDefaultSchema(t, testName, opts) + config := mysql_testing_utils.MysqlTestConfig(testName) + db, err := common_mysql.NewDB(config, &common_mysql.DBOptions{}, "") + require.NoError(t, err) + + t.Cleanup(func() { db.Close() }) + + return &TestDB{ + DB: db, + Logger: slog.New(slog.DiscardHandler), + } +} + +// Conns returns DBConnections for creating a datastore. +func (tdb *TestDB) Conns() *common_mysql.DBConnections { + return &common_mysql.DBConnections{Primary: tdb.DB, Replica: tdb.DB} +} + +// TruncateTables clears the tables used by the chart bounded context. +func (tdb *TestDB) TruncateTables(t *testing.T) { + t.Helper() + mysql_testing_utils.TruncateTables(t, tdb.DB, tdb.Logger, nil, "host_scd_data") +} + +// InsertSCDRow inserts a single host_scd_data row for tests. host_bitmap is +// stored as an empty blob since cleanup tests don't care about its contents. +func (tdb *TestDB) InsertSCDRow(t *testing.T, dataset, entityID string, validFrom, validTo time.Time) { + t.Helper() + ctx := t.Context() + + _, err := tdb.DB.ExecContext(ctx, ` + INSERT INTO host_scd_data (dataset, entity_id, host_bitmap, valid_from, valid_to) + VALUES (?, ?, ?, ?, ?) + `, dataset, entityID, []byte{}, validFrom, validTo) + require.NoError(t, err) +} + +// CountSCDRows returns the total number of rows in host_scd_data. +func (tdb *TestDB) CountSCDRows(t *testing.T) int { + t.Helper() + ctx := t.Context() + + var n int + err := tdb.DB.GetContext(ctx, &n, `SELECT COUNT(*) FROM host_scd_data`) + require.NoError(t, err) + return n +} diff --git a/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go b/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go index 7cae3b8b24a..ef010757d18 100644 --- a/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go +++ b/server/datastore/mysql/migrations/tables/20260423161823_AddHostSCDData.go @@ -32,7 +32,8 @@ func Up_20260423161823(tx *sql.Tx) error { updated_at TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY uniq_entity_bucket (dataset, entity_id, valid_from), - KEY idx_dataset_range (dataset, valid_from, valid_to) + KEY idx_dataset_range (dataset, valid_from, valid_to), + KEY idx_valid_to_dataset (valid_to, dataset, entity_id) ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci `) if err != nil { diff --git a/server/datastore/mysql/schema.sql b/server/datastore/mysql/schema.sql index 24ec181967b..45a1df8c745 100644 --- a/server/datastore/mysql/schema.sql +++ b/server/datastore/mysql/schema.sql @@ -1107,7 +1107,8 @@ CREATE TABLE `host_scd_data` ( `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uniq_entity_bucket` (`dataset`,`entity_id`,`valid_from`), - KEY `idx_dataset_range` (`dataset`,`valid_from`,`valid_to`) + KEY `idx_dataset_range` (`dataset`,`valid_from`,`valid_to`), + KEY `idx_valid_to_dataset` (`valid_to`,`dataset`,`entity_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; /*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET @saved_cs_client = @@character_set_client */;