From c7cdd30212e406048eb7b37a4719f51a36a5f1be Mon Sep 17 00:00:00 2001 From: Stephen Hinck Date: Mon, 29 Jun 2026 10:49:06 -0700 Subject: [PATCH] feat: Optimize PG graph database wipes - BED-8787 --- drivers/pg/driver.go | 24 +++++ integration/wipe_graph_test.go | 157 +++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) create mode 100644 integration/wipe_graph_test.go diff --git a/drivers/pg/driver.go b/drivers/pg/driver.go index 5b8adb1b..36833741 100644 --- a/drivers/pg/driver.go +++ b/drivers/pg/driver.go @@ -186,3 +186,27 @@ func (s *Driver) OptimizeStorage(ctx context.Context) error { return optimizeStorage(ctx, conn) } + +// WipeGraph truncates the partitioned node and edge tables, removing every node and edge across all graphs in a single +// statement that bypasses the per-row edge cascade trigger. The optional retain delegate runs within the same +// transaction after the truncate, allowing callers to atomically recreate any data that must survive the wipe. If retain +// returns an error the transaction is rolled back and the graph is left untouched. +func (s *Driver) WipeGraph(ctx context.Context, retain graph.TransactionDelegate) error { + return s.WriteTransaction(ctx, func(tx graph.Transaction) error { + result := tx.Raw("truncate table node, edge;", nil) + + // Close before issuing further statements: a pgx transaction shares a single connection and cannot run the + // retain delegate's queries while these rows remain open. + result.Close() + + if err := result.Error(); err != nil { + return fmt.Errorf("truncating graph tables: %w", err) + } + + if retain != nil { + return retain(tx) + } + + return nil + }) +} diff --git a/integration/wipe_graph_test.go b/integration/wipe_graph_test.go new file mode 100644 index 00000000..0e1390f2 --- /dev/null +++ b/integration/wipe_graph_test.go @@ -0,0 +1,157 @@ +package integration + +import ( + "context" + "errors" + "testing" + + "github.com/specterops/dawgs/drivers/pg" + "github.com/specterops/dawgs/graph" + "github.com/specterops/dawgs/query" + "github.com/stretchr/testify/require" +) + +// WipeGraph is a Postgres-only bulk-delete primitive, so this suite is scoped to the pg driver and skips itself unless +// CONNECTION_STRING selects a Postgres backend. +func TestWipeGraph(t *testing.T) { + var ( + wipeNode = graph.StringKind("WipeNode") + survivor = graph.StringKind("WipeSurvivor") + wipeEdge = graph.StringKind("WIPE_EDGE") + + defaultGraph = graph.Graph{Name: "wipe_default", Nodes: graph.Kinds{wipeNode, survivor}, Edges: graph.Kinds{wipeEdge}} + secondaryGraph = graph.Graph{Name: "wipe_secondary", Nodes: graph.Kinds{wipeNode, survivor}, Edges: graph.Kinds{wipeEdge}} + + schema = graph.Schema{ + Graphs: []graph.Graph{defaultGraph, secondaryGraph}, + DefaultGraph: defaultGraph, + } + ) + + session := Open(t, Options{ + RequireDriver: pg.DriverName, + SkipIfNoConnection: true, + SkipIfDriverMismatch: true, + Schema: &schema, + }) + + var ( + ctx = session.Ctx + db = session.DB + wiper, isPG = graph.AsDriver[*pg.Driver](db) + ) + + require.True(t, isPG, "expected a Postgres driver") + + // seed populates the default and secondary partitions with two connected nodes and an additional node in the + // secondary graph so the test exercises a multi-partition truncate. + seed := func(t *testing.T) { + require.NoError(t, db.WriteTransaction(ctx, func(tx graph.Transaction) error { + start, err := tx.CreateNode(graph.NewProperties(), wipeNode) + if err != nil { + return err + } + + end, err := tx.CreateNode(graph.NewProperties(), wipeNode) + if err != nil { + return err + } + + if _, err := tx.CreateRelationshipByIDs(start.ID, end.ID, wipeEdge, graph.NewProperties()); err != nil { + return err + } + + if _, err := tx.WithGraph(secondaryGraph).CreateNode(graph.NewProperties(), wipeNode); err != nil { + return err + } + + return nil + })) + } + + t.Run("truncates all partitions and atomically retains the recreated node", func(t *testing.T) { + session.ClearGraph(t) + seed(t) + + require.Equal(t, int64(3), countNodes(t, ctx, db)) + require.Equal(t, int64(1), countEdges(t, ctx, db)) + + require.NoError(t, wiper.WipeGraph(ctx, func(tx graph.Transaction) error { + _, err := tx.CreateNode(graph.AsProperties(map[string]any{"name": "kept"}), survivor) + return err + })) + + require.Equal(t, int64(1), countNodes(t, ctx, db)) + require.Equal(t, int64(0), countEdges(t, ctx, db)) + + require.NoError(t, db.ReadTransaction(ctx, func(tx graph.Transaction) error { + node, err := tx.Nodes().Filter(query.Kind(query.Node(), survivor)).First() + if err != nil { + return err + } + + require.True(t, node.Kinds.ContainsOneOf(survivor)) + + name, err := node.Properties.Get("name").String() + require.NoError(t, err) + require.Equal(t, "kept", name) + + return nil + })) + }) + + t.Run("rolls back the truncate when the retain delegate fails", func(t *testing.T) { + session.ClearGraph(t) + seed(t) + + errRetain := errors.New("retain failed") + + err := wiper.WipeGraph(ctx, func(tx graph.Transaction) error { + return errRetain + }) + + require.ErrorIs(t, err, errRetain) + + // The transaction rolled back, so the seeded graph is left untouched. + require.Equal(t, int64(3), countNodes(t, ctx, db)) + require.Equal(t, int64(1), countEdges(t, ctx, db)) + }) + + t.Run("truncates without a retain delegate", func(t *testing.T) { + session.ClearGraph(t) + seed(t) + + require.NoError(t, wiper.WipeGraph(ctx, nil)) + + require.Equal(t, int64(0), countNodes(t, ctx, db)) + require.Equal(t, int64(0), countEdges(t, ctx, db)) + }) +} + +func countNodes(t *testing.T, ctx context.Context, db graph.Database) int64 { + t.Helper() + + var count int64 + + require.NoError(t, db.ReadTransaction(ctx, func(tx graph.Transaction) error { + result, err := tx.Nodes().Count() + count = result + return err + })) + + return count +} + +func countEdges(t *testing.T, ctx context.Context, db graph.Database) int64 { + t.Helper() + + var count int64 + + require.NoError(t, db.ReadTransaction(ctx, func(tx graph.Transaction) error { + result, err := tx.Relationships().Count() + count = result + return err + })) + + return count +}