Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions packages/storage/src/graphdb-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,16 +406,43 @@ const NODE_SENTINEL_ROW = buildNodeSentinel();
* accept them silently. Synthesize a minimal placeholder so the bulk
* load completes, with the original id preserved for round-trip.
*/
/** Distinct from/to ids referenced by an edge batch. */
function edgeEndpointIds(
edges: readonly { readonly from: NodeId; readonly to: NodeId }[],
): readonly string[] {
const ids = new Set<string>();
for (const e of edges) {
ids.add(e.from as string);
ids.add(e.to as string);
}
return [...ids];
}

/**
* Synthesize a placeholder CodeNode for every edge endpoint id that has no
* real node, so lbug's COPY (which requires a real PK for each rel endpoint)
* succeeds.
*
* `alreadyPersisted` (upsert mode only) lists endpoint ids that already exist
* in the store. Those must NOT be synthesized: a placeholder would later be
* `mergeNodes`-ed (DETACH DELETE + re-insert), clobbering the real node. In
* replace mode the store was just truncated, so pass `undefined`.
*/
function synthesizePlaceholderNodes(
nodes: readonly GraphNode[],
edges: readonly { readonly from: NodeId; readonly to: NodeId }[],
alreadyPersisted?: ReadonlySet<string>,
): GraphNode[] {
const known = new Set<string>();
for (const n of nodes) known.add(n.id as string);
const missing = new Set<string>();
for (const e of edges) {
if (!known.has(e.from as string)) missing.add(e.from as string);
if (!known.has(e.to as string)) missing.add(e.to as string);
if (!known.has(e.from as string) && !(alreadyPersisted?.has(e.from as string) ?? false)) {
missing.add(e.from as string);
}
if (!known.has(e.to as string) && !(alreadyPersisted?.has(e.to as string) ?? false)) {
missing.add(e.to as string);
}
}
if (missing.size === 0) return [];
const out: GraphNode[] = [];
Expand Down Expand Up @@ -677,7 +704,22 @@ export class GraphDbStore implements IGraphStore {
// in `reason`) that never have a matching node. Synthesize one
// CodeNode per orphan id so the COPY succeeds; downstream tools
// recognise these by their well-known id prefix.
const synthetic = synthesizePlaceholderNodes(nodes, edges);
//
// Upsert mode caveat: a batch that carries ONLY new nodes (e.g.
// `ingest-sarif` upserts Finding nodes plus FOUND_IN edges into the
// already-persisted graph) references real, previously-loaded nodes
// that are absent from THIS batch. Synthesizing a placeholder for
// such an id and then `mergeNodes`-ing it (DETACH DELETE + re-insert)
// would DESTROY the real node — turning, e.g., the Function a finding
// was found in into a `<placeholder>` Route. So in upsert mode we must
// exclude ids that already exist in the store from synthesis; only
// genuinely-orphan ids (no node in the batch AND none in the store)
// get a placeholder.
const existingIds =
mode === "upsert"
? await this.filterExistingNodeIds(pool, edgeEndpointIds(edges))
: undefined;
const synthetic = synthesizePlaceholderNodes(nodes, edges, existingIds);
const allNodes = synthetic.length > 0 ? [...nodes, ...synthetic] : nodes;
await this.insertNodes(pool, allNodes, mode, reportProgress, started);

Expand Down Expand Up @@ -750,6 +792,23 @@ export class GraphDbStore implements IGraphStore {
await pool.query("MATCH (n:CodeNode) DELETE n");
}

/**
* Return the subset of `candidateIds` that already exist as CodeNodes in the
* store. Used by upsert-mode bulkLoad to avoid synthesizing a placeholder
* (and then `mergeNodes`-clobbering) for an edge endpoint that is a real,
* previously-persisted node not present in the current batch. Reuses the
* `listNodes({ ids })` finder so id decoding stays in one place; passes no
* `limit` so every match is returned.
*/
private async filterExistingNodeIds(
_pool: GraphDbPool,
candidateIds: readonly string[],
): Promise<ReadonlySet<string>> {
if (candidateIds.length === 0) return new Set<string>();
const found = await this.listNodes({ ids: candidateIds as readonly NodeId[] });
return new Set(found.map((n) => n.id as string));
}

private async insertNodes(
pool: GraphDbPool,
nodes: readonly GraphNode[],
Expand Down
122 changes: 122 additions & 0 deletions packages/storage/src/graphdb-roundtrip.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,125 @@ test("round-trip is deterministic across independent writes of the same graph",
assert.equal(hashA, hashB, "hashes across two stores must match");
assert.equal(hashA, originalHash, "hash after round-trip must match the original graph hash");
});

// Regression: upsert-mode bulkLoad must NOT clobber an existing real node when
// the upsert batch carries an edge that references it but not the node itself.
//
// ingest-sarif upserts Finding nodes + FOUND_IN edges into the already-loaded
// graph. A FOUND_IN edge targets the Function the finding sits inside. That
// Function id is absent from the upsert batch (which holds only Findings), so
// synthesizePlaceholderNodes used to mint a `kind:Route` placeholder for it;
// mergeNodes (DETACH DELETE + re-insert) then DESTROYED the real Function,
// turning it into a `<placeholder>` Route. Net effect: every function/method
// containing a scanner finding silently vanished from the graph, breaking
// cross-module context/impact. The fix excludes already-persisted ids from
// synthesis. See field-report Issue 1.
test("upsert with an edge to an existing node does not clobber it into a placeholder", async () => {
if (!(await hasNativeBinding())) {
assert.ok(true, "native binding unavailable — skipping");
return;
}
const dbPath = await scratchDbPath();
const store = new GraphDbStore(dbPath);
try {
await store.open();
await store.createSchema();

// Phase 1 — replace-load a real Function node.
const fnId = makeNodeId("Function", "src/client.py", "get_bedrock_client");
const base = new KnowledgeGraph();
base.addNode({
id: fnId,
kind: "Function",
name: "get_bedrock_client",
filePath: "src/client.py",
startLine: 146,
endLine: 171,
isExported: true,
} as GraphNode);
await store.bulkLoad(base, { mode: "replace" });

// Phase 2 — upsert a Finding + a FOUND_IN edge targeting the Function.
// The Function node is intentionally NOT in this batch.
const findingId = makeNodeId("Finding", "src/client.py", "semgrep:logger-leak:165");
const upsert = new KnowledgeGraph();
upsert.addNode({
id: findingId,
kind: "Finding",
name: "logger-credential-leak",
filePath: "src/client.py",
startLine: 165,
endLine: 170,
} as GraphNode);
upsert.addEdge({
from: findingId,
to: fnId,
type: "FOUND_IN" as RelationType,
confidence: 1,
reason: "startLine=165;endLine=170",
});
await store.bulkLoad(upsert, { mode: "upsert" });

// The Function must still be a Function — not a synthesized Route
// placeholder, and not deleted.
const [node] = await store.listNodes({ ids: [fnId] });
assert.ok(node, "the Function node must survive the finding upsert");
assert.equal(
node.kind,
"Function",
`expected Function, got ${node.kind} (clobbered by placeholder)`,
);
assert.equal(node.name, "get_bedrock_client");
assert.equal(node.filePath, "src/client.py");
assert.notEqual(node.filePath, "<placeholder>");

// The Finding and its edge must also have landed.
const findings = await store.listNodesByKind("Finding", { limit: 10 });
assert.equal(findings.length, 1, "the upserted Finding must persist");
} finally {
await store.close();
}
});

// Companion: a genuinely-orphan edge endpoint (no node in batch, none in store)
// still gets a placeholder so the COPY's PK constraint holds — the upsert fix
// must not break the original unresolved-FETCHES use case.
test("upsert still synthesizes a placeholder for a genuinely-orphan edge endpoint", async () => {
if (!(await hasNativeBinding())) {
assert.ok(true, "native binding unavailable — skipping");
return;
}
const dbPath = await scratchDbPath();
const store = new GraphDbStore(dbPath);
try {
await store.open();
await store.createSchema();
await store.bulkLoad(new KnowledgeGraph(), { mode: "replace" });

const srcId = makeNodeId("Function", "src/api.py", "call_remote");
const orphanId = "Route:<placeholder>:https://example.com/{id}" as NodeId;
const g = new KnowledgeGraph();
g.addNode({
id: srcId,
kind: "Function",
name: "call_remote",
filePath: "src/api.py",
startLine: 1,
endLine: 5,
isExported: true,
} as GraphNode);
g.addEdge({
from: srcId,
to: orphanId,
type: "FETCHES" as RelationType,
confidence: 0.5,
reason: "https://example.com/{id}",
});
await store.bulkLoad(g, { mode: "upsert" });

const [placeholder] = await store.listNodes({ ids: [orphanId] });
assert.ok(placeholder, "orphan FETCHES target must be synthesized so the edge COPY succeeds");
} finally {
await store.close();
}
});
Loading