Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions drivers/pg/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pg
import (
"context"
"fmt"
"strings"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -186,3 +187,127 @@ func (s *Driver) OptimizeStorage(ctx context.Context) error {

return optimizeStorage(ctx, conn)
}

// resolveKindIDs maps kinds to their integer IDs, refreshing the schema cache once on a miss. It returns the resolved
// IDs alongside any kinds that remain undefined after the refresh, so callers can decide whether an unresolved kind is
// a tolerable no-op (include predicates) or must fail closed (exclude predicates).
func (s *Driver) resolveKindIDs(ctx context.Context, kinds graph.Kinds) ([]int16, graph.Kinds, error) {
if len(kinds) == 0 {
return nil, nil, nil
}

s.lock.RLock()
if kindIDs, missingKinds := s.mapKinds(kinds); len(missingKinds) == 0 {
s.lock.RUnlock()
return kindIDs, nil, nil
}
s.lock.RUnlock()

s.lock.Lock()
defer s.lock.Unlock()

if err := s.Fetch(ctx); err != nil {
return nil, nil, err
}

kindIDs, missingKinds := s.mapKinds(kinds)
return kindIDs, missingKinds, nil
}

// DeleteNodesByKinds performs a server-side, set-based delete of nodes using the kind_ids GIN index instead of
// streaming node IDs through the application. A node is deleted when its kind_ids overlap includeAny (or, when
// includeAny is empty, for every node) and do not overlap excludeAny. Deleting nodes fires the delete_node_edges
// trigger, cascading the attached edge deletes.
//
// includeAny is mapped to kind IDs tolerantly: include kinds that are not defined in the database map to no IDs and
// therefore match no nodes, so a request that targets only undefined kinds is a safe no-op rather than an accidental
// full delete. excludeAny is mapped fail-closed: if any exclude kind is undefined the delete is refused, because
// silently dropping an exclusion would widen the delete and could remove protected nodes (e.g. an unresolved
// MigrationData would turn a guarded wipe into an unguarded delete from node).
func (s *Driver) DeleteNodesByKinds(ctx context.Context, includeAny graph.Kinds, excludeAny graph.Kinds) error {
includeIDs, _, err := s.resolveKindIDs(ctx, includeAny)
if err != nil {
return err
}

excludeIDs, excludeMissing, err := s.resolveKindIDs(ctx, excludeAny)
if err != nil {
return err
}
if len(excludeMissing) > 0 {
return fmt.Errorf("cannot exclude undefined kinds from node delete: %v", excludeMissing)
}

statement, arguments := buildNodeDeleteStatement(len(includeAny) > 0, includeIDs, excludeIDs)

conn, err := s.pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("acquire connection for node delete: %w", err)
}
defer conn.Release()

if _, err := conn.Exec(ctx, statement, arguments...); err != nil {
return fmt.Errorf("%s: %w", statement, err)
}

return nil
}

// buildNodeDeleteStatement renders the node delete statement and its positional arguments for the given resolved kind
// IDs. The include predicate is emitted whenever an include filter was requested (includeRequested), even if includeIDs
// is empty, so that targeting only undefined kinds matches no nodes. The exclude predicate is emitted only when
// excludeIDs is non-empty, so an unresolved exclusion can never widen the delete into an unguarded wipe.
func buildNodeDeleteStatement(includeRequested bool, includeIDs []int16, excludeIDs []int16) (string, []any) {
var (
predicates []string
arguments []any
)

if includeRequested {
arguments = append(arguments, includeIDs)
predicates = append(predicates, fmt.Sprintf("kind_ids operator (pg_catalog.&&) $%d::int2[]", len(arguments)))
}

if len(excludeIDs) > 0 {
arguments = append(arguments, excludeIDs)
predicates = append(predicates, fmt.Sprintf("not (kind_ids operator (pg_catalog.&&) $%d::int2[])", len(arguments)))
}

statement := "delete from node"
if len(predicates) > 0 {
statement += " where " + strings.Join(predicates, " and ")
}

return statement, arguments
}

// DeleteRelationshipsByKinds performs a server-side, set-based delete of relationships whose kind_id matches any of
// the given kinds, using the edge_kind_id_id_start_id_end_id_index covering index instead of streaming relationship
// IDs through the application.
//
// kinds are mapped to kind IDs tolerantly: kinds that are not defined in the database map to no IDs. An empty kinds
// argument, or one that maps entirely to undefined kinds, deletes nothing rather than every relationship.
func (s *Driver) DeleteRelationshipsByKinds(ctx context.Context, kinds graph.Kinds) error {
if len(kinds) == 0 {
return nil
}

kindIDs, _, err := s.resolveKindIDs(ctx, kinds)
if err != nil {
return err
}

const statement = "delete from edge where kind_id = any($1::int2[])"

conn, err := s.pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("acquire connection for relationship delete: %w", err)
}
defer conn.Release()

if _, err := conn.Exec(ctx, statement, kindIDs); err != nil {
return fmt.Errorf("%s: %w", statement, err)
}

return nil
}
100 changes: 100 additions & 0 deletions drivers/pg/driver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package pg

import (
"context"
"testing"

"github.com/specterops/dawgs/graph"
"github.com/stretchr/testify/require"
)

// TestBuildNodeDeleteStatement covers the statement/argument construction for DeleteNodesByKinds, including the guard
// that prevents an unresolved exclusion from widening the delete into an unguarded wipe.
func TestBuildNodeDeleteStatement(t *testing.T) {
var (
includeIDs = []int16{1, 2}
excludeIDs = []int16{9}
)

t.Run("no filters deletes all nodes", func(t *testing.T) {
statement, arguments := buildNodeDeleteStatement(false, nil, nil)
require.Equal(t, "delete from node", statement)
require.Empty(t, arguments)
})

t.Run("include only", func(t *testing.T) {
statement, arguments := buildNodeDeleteStatement(true, includeIDs, nil)
require.Equal(t, "delete from node where kind_ids operator (pg_catalog.&&) $1::int2[]", statement)
require.Equal(t, []any{includeIDs}, arguments)
})

t.Run("exclude only", func(t *testing.T) {
statement, arguments := buildNodeDeleteStatement(false, nil, excludeIDs)
require.Equal(t, "delete from node where not (kind_ids operator (pg_catalog.&&) $1::int2[])", statement)
require.Equal(t, []any{excludeIDs}, arguments)
})

t.Run("include and exclude are positionally numbered", func(t *testing.T) {
statement, arguments := buildNodeDeleteStatement(true, includeIDs, excludeIDs)
require.Equal(t, "delete from node where kind_ids operator (pg_catalog.&&) $1::int2[] and not (kind_ids operator (pg_catalog.&&) $2::int2[])", statement)
require.Equal(t, []any{includeIDs, excludeIDs}, arguments)
})

t.Run("empty excludeIDs cannot widen the delete", func(t *testing.T) {
// A requested-but-unresolved exclusion must never emit a not(... && '{}') clause that matches every row.
statement, arguments := buildNodeDeleteStatement(false, nil, []int16{})
require.Equal(t, "delete from node", statement)
require.Empty(t, arguments)

statement, arguments = buildNodeDeleteStatement(true, includeIDs, []int16{})
require.Equal(t, "delete from node where kind_ids operator (pg_catalog.&&) $1::int2[]", statement)
require.Equal(t, []any{includeIDs}, arguments)
})

t.Run("include requested with empty IDs is a tolerant no-op predicate", func(t *testing.T) {
statement, arguments := buildNodeDeleteStatement(true, []int16{}, nil)
require.Equal(t, "delete from node where kind_ids operator (pg_catalog.&&) $1::int2[]", statement)
require.Equal(t, []any{[]int16{}}, arguments)
})
}

// TestResolveKindIDsDefinedFastPath exercises the cache-hit path of resolveKindIDs, which resolves defined kinds
// without touching the database. The cache-miss/refresh and fail-closed exclude paths require a live pool and are
// covered by the integration suite.
func TestResolveKindIDsDefinedFastPath(t *testing.T) {
ctx := context.Background()

driver := &Driver{SchemaManager: NewSchemaManager(nil, 0)}

var (
userKind = graph.StringKind("User")
groupKind = graph.StringKind("Group")
)
driver.kindsByID[userKind] = 1
driver.kindsByID[groupKind] = 2

t.Run("defined kinds resolve with no missing", func(t *testing.T) {
ids, missing, err := driver.resolveKindIDs(ctx, graph.Kinds{userKind, groupKind})
require.NoError(t, err)
require.Empty(t, missing)
require.ElementsMatch(t, []int16{1, 2}, ids)
})

t.Run("empty kinds short-circuit", func(t *testing.T) {
ids, missing, err := driver.resolveKindIDs(ctx, nil)
require.NoError(t, err)
require.Nil(t, ids)
require.Nil(t, missing)
})
}

// TestDeleteRelationshipsByKindsEmptyIsNoop verifies that an empty kinds request returns before acquiring a
// connection, so it is a safe no-op rather than deleting every relationship.
func TestDeleteRelationshipsByKindsEmptyIsNoop(t *testing.T) {
ctx := context.Background()

driver := &Driver{SchemaManager: NewSchemaManager(nil, 0)}

require.NoError(t, driver.DeleteRelationshipsByKinds(ctx, nil))
require.NoError(t, driver.DeleteRelationshipsByKinds(ctx, graph.Kinds{}))
}
Loading
Loading