diff --git a/package-lock.json b/package-lock.json index a25f469d..87a73aa5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "@libpg-query/parser": "^17.6.3", "@opentelemetry/api": "^1.9.0", "@pgsql/types": "^17.6.2", - "@query-doctor/core": "^0.10.5", + "@query-doctor/core": "^0.10.6", "async-sema": "^3.1.1", "capnweb": "^0.7.0", "dedent": "^1.7.1", diff --git a/package.json b/package.json index 0f708836..f39f711b 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "@libpg-query/parser": "^17.6.3", "@opentelemetry/api": "^1.9.0", "@pgsql/types": "^17.6.2", - "@query-doctor/core": "^0.10.5", + "@query-doctor/core": "^0.10.6", "async-sema": "^3.1.1", "capnweb": "^0.7.0", "dedent": "^1.7.1", diff --git a/src/remote/api-client.ts b/src/remote/api-client.ts index 66d8aa84..62319102 100644 --- a/src/remote/api-client.ts +++ b/src/remote/api-client.ts @@ -1,11 +1,11 @@ import { newWebSocketRpcSession, RpcTarget } from "capnweb"; import type { RpcStub } from "capnweb"; -import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi, RecentQuery } from "@query-doctor/core"; +import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi } from "@query-doctor/core"; import type { ExportedStats } from "@query-doctor/core"; import { PgIdentifier, Statistics } from "@query-doctor/core"; import { log } from "../log.ts"; import type { Remote } from "./remote.ts"; -import type { OptimizedQuery } from "../sql/recent-query.ts"; +import type { OptimizedQuery, RecentQuery } from "../sql/recent-query.ts"; export function hookUpApiReporter(api: RpcStub, remote: Remote): () => void { const onExtensionPresenceChanged = (presence: Parameters[0]) => { diff --git a/src/remote/query-loader.test.ts b/src/remote/query-loader.test.ts index 048efc17..47d8c502 100644 --- a/src/remote/query-loader.test.ts +++ b/src/remote/query-loader.test.ts @@ -35,7 +35,6 @@ function createMockRecentQuery(query: string): RecentQuery { nudges: [], hash: "test_hash" as QueryHash, normalizedHash: "test_normalized_hash" as QueryHash, - seenAt: Date.now(), optimization: { state: "waiting" }, withOptimization: function () { return this as OptimizedQuery; diff --git a/src/remote/query-optimizer.test.ts b/src/remote/query-optimizer.test.ts index 0edc0790..0fe1cd63 100644 --- a/src/remote/query-optimizer.test.ts +++ b/src/remote/query-optimizer.test.ts @@ -144,7 +144,6 @@ test("controller syncs correctly", async () => { [], 0 as any, 0 as any, - 1, ), ]); expect( diff --git a/src/sql/pgbadger.ts b/src/sql/pgbadger.ts index 9a88e868..258a167b 100644 --- a/src/sql/pgbadger.ts +++ b/src/sql/pgbadger.ts @@ -3,7 +3,7 @@ import { statSync } from "node:fs"; import csv from "fast-csv"; import type { RawRecentQuery, RecentQuery } from "./recent-query.ts"; import { preprocessEncodedJson } from "./json.ts"; -import { QueryCache } from "../sync/seen-cache.ts"; +import { syncQueries } from "../sync/query-sync.ts"; import type { RecentQuerySource } from "./recent-query.ts"; const INTROSPECTION_MARKER = "@qd_introspection"; @@ -60,7 +60,6 @@ export class PgbadgerSource implements RecentQuerySource { constructor( private readonly logPath: string, - private readonly cache: QueryCache = new QueryCache(), ) { this.logSize = statSync(this.logPath).size; console.log(`logPath=${this.logPath},fileSize=${this.logSize}`); @@ -95,6 +94,6 @@ export class PgbadgerSource implements RecentQuerySource { this.totalRows++; } console.log("Finished pgbadger stream"); - return this.cache.sync(rawQueries); + return syncQueries(rawQueries); } } diff --git a/src/sql/recent-query.test.ts b/src/sql/recent-query.test.ts index b24c656a..6a1b82e1 100644 --- a/src/sql/recent-query.test.ts +++ b/src/sql/recent-query.test.ts @@ -138,7 +138,7 @@ test("isTargetlessSelectQuery returns false when table references exist", () => test("constructor sets derived boolean properties correctly for a SELECT on user tables", () => { const refs: TableReference[] = [{ table: "users", schema: "public" }]; - const rq = new RecentQuery(makeRawQuery(), refs, [], [], [], testHash, testNormalizedHash, 1000); + const rq = new RecentQuery(makeRawQuery(), refs, [], [], [], testHash, testNormalizedHash); expect(rq.isSelectQuery).toBe(true); expect(rq.isSystemQuery).toBe(false); expect(rq.isIntrospection).toBe(false); @@ -146,7 +146,7 @@ test("constructor sets derived boolean properties correctly for a SELECT on user }); test("constructor sets isTargetlessSelectQuery=true for SELECT with no table refs", () => { - const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash, 1000); + const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash); expect(rq.isSelectQuery).toBe(true); expect(rq.isTargetlessSelectQuery).toBe(true); }); @@ -160,7 +160,6 @@ test("constructor sets isTargetlessSelectQuery=false for non-SELECT even with em [], testHash, testNormalizedHash, - 1000, ); expect(rq.isSelectQuery).toBe(false); expect(rq.isTargetlessSelectQuery).toBe(false); @@ -176,7 +175,7 @@ test("constructor copies all data fields from RawRecentQuery", () => { rows: "0", topLevel: false, }); - const rq = new RecentQuery(data, [], [], [], [], testHash, testNormalizedHash, 1000); + const rq = new RecentQuery(data, [], [], [], [], testHash, testNormalizedHash); expect(rq.username).toBe("admin"); expect(rq.query).toBe("SELECT 1"); expect(rq.formattedQuery).toBe("SELECT\n 1"); @@ -185,13 +184,12 @@ test("constructor copies all data fields from RawRecentQuery", () => { expect(rq.rows).toBe("0"); expect(rq.topLevel).toBe(false); expect(rq.hash).toBe(testHash); - expect(rq.seenAt).toBe(1000); }); // --- withOptimization --- test("withOptimization attaches optimization to the instance", () => { - const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash, 1000); + const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash); const optimization = { plan: "mock plan" } as any; const optimized = rq.withOptimization(optimization); expect(optimized.optimization).toBe(optimization); @@ -203,10 +201,9 @@ test("withOptimization attaches optimization to the instance", () => { test("analyze produces a RecentQuery with formatted query and analysis", async () => { const data = makeRawQuery({ query: "SELECT id FROM users WHERE id = $1" }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 2000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq).toBeInstanceOf(RecentQuery); expect(rq.hash).toBe(testHash); - expect(rq.seenAt).toBe(2000); // The formatted query should have uppercase keywords expect(rq.formattedQuery).toMatch(/SELECT/); // Table references should include 'users' @@ -216,7 +213,7 @@ test("analyze produces a RecentQuery with formatted query and analysis", async ( test("analyze throws on unparseable SQL", async () => { const data = makeRawQuery({ query: "THIS IS NOT VALID SQL AT ALL !!!" }); await expect( - RecentQuery.analyze(data, testHash, testNormalizedHash, 3000), + RecentQuery.analyze(data, testHash, testNormalizedHash), ).rejects.toThrow(); }); @@ -224,7 +221,7 @@ test("analyze throws on unparseable SQL", async () => { test("analyze sets isSelectQuery=true for SELECT", async () => { const data = makeRawQuery({ query: "SELECT * FROM users" }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.isSelectQuery).toBe(true); }); @@ -232,7 +229,7 @@ test("analyze sets isSelectQuery=true for CTE with SELECT", async () => { const data = makeRawQuery({ query: "WITH cte AS (SELECT id FROM users) SELECT * FROM cte", }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.isSelectQuery).toBe(true); }); @@ -241,7 +238,7 @@ test("analyze sets isSelectQuery=false for UPDATE even with SELECT subquery", as query: 'UPDATE "public"."jobs" SET "state" = $1 FROM (SELECT id FROM "public"."jobs" WHERE state = $2 LIMIT 10) AS s1 WHERE "jobs".id = s1.id', }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.isSelectQuery).toBe(false); }); @@ -249,7 +246,7 @@ test("analyze sets isSelectQuery=false for INSERT ... SELECT", async () => { const data = makeRawQuery({ query: "INSERT INTO archive SELECT * FROM users WHERE active = false", }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.isSelectQuery).toBe(false); }); @@ -258,7 +255,7 @@ test("analyze sets isSelectQuery=false for DELETE with EXISTS subquery", async ( query: "DELETE FROM users WHERE EXISTS (SELECT 1 FROM banned WHERE banned.user_id = users.id)", }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.isSelectQuery).toBe(false); }); @@ -269,7 +266,7 @@ test("analyze populates displayQuery for wide SELECTs", async () => { query: 'SELECT "u"."id", "u"."email", "u"."first_name", "u"."last_name", "u"."created_at", "u"."updated_at", "u"."stripe_customer_id" FROM "users" "u" WHERE "u"."id" = $1', }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); // Normalize whitespace because the analyzer prettier-formats the query // before compacting; the site applies the same normalization on render. const normalized = rq.displayQuery?.replace(/\s+/g, " ").trim(); @@ -280,7 +277,7 @@ test("analyze populates displayQuery for wide SELECTs", async () => { test("analyze leaves displayQuery undefined for narrow SELECTs", async () => { const data = makeRawQuery({ query: "SELECT id FROM users WHERE id = $1" }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.displayQuery).toBeUndefined(); }); @@ -289,7 +286,7 @@ test("analyze leaves displayQuery undefined for non-SELECTs", async () => { query: "INSERT INTO archive SELECT a, b, c, d, e, f, g, h FROM users WHERE active = false", }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.displayQuery).toBeUndefined(); }); @@ -298,14 +295,14 @@ test("analyze leaves displayQuery undefined for UNION", async () => { query: "SELECT a, b, c, d, e, f, g FROM t UNION SELECT a, b, c, d, e, f, g FROM u", }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.displayQuery).toBeUndefined(); }); test("analyze strips sqlcommenter tags from formattedQuery", async () => { const data = makeRawQuery({ query: "select 1 /*a='1',b='2'*/" }); - const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000); + const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash); expect(rq.tags).toEqual([ { key: "a", value: "1" }, { key: "b", value: "2" }, diff --git a/src/sql/recent-query.ts b/src/sql/recent-query.ts index 12d66815..e67db76a 100644 --- a/src/sql/recent-query.ts +++ b/src/sql/recent-query.ts @@ -1,6 +1,5 @@ import * as prettier from "prettier"; import prettierPluginSql from "prettier-plugin-sql"; -import type { SegmentedQueryCache } from "../sync/seen-cache.ts"; import { Analyzer, compactSelectList, @@ -18,8 +17,7 @@ import { log } from "../log.ts"; import type { LiveQueryOptimization } from "../remote/optimization.ts"; /** - * Constructed by syncing with {@link SegmentedQueryCache.sync} - * and supplying the date the query was last seen + * Constructed by syncing with {@link syncQueries} */ export class RecentQuery { private static HARDCODED_LIMIT = 50; @@ -50,7 +48,6 @@ export class RecentQuery { readonly nudges: Nudge[], readonly hash: QueryHash, readonly normalizedHash: QueryHash, - readonly seenAt: number, analysisSkipped = false, statementType?: StatementType, ) { @@ -101,7 +98,6 @@ export class RecentQuery { nudges: this.nudges, hash: this.hash, normalizedHash: this.normalizedHash, - seenAt: this.seenAt, optimization: this.optimization, })); } @@ -117,7 +113,6 @@ export class RecentQuery { data: RawRecentQuery, hash: QueryHash, normalizedHash: QueryHash, - seenAt: number, ) { if (data.query.length > RecentQuery.MAX_ANALYZABLE_QUERY_SIZE) { return new RecentQuery( @@ -128,7 +123,6 @@ export class RecentQuery { [], hash, normalizedHash, - seenAt, true, ); } @@ -149,7 +143,6 @@ export class RecentQuery { analysis.nudges, hash, normalizedHash, - seenAt, false, analysis.statementType, ); diff --git a/src/sync/connection-manager.ts b/src/sync/connection-manager.ts index e370e319..fe068169 100644 --- a/src/sync/connection-manager.ts +++ b/src/sync/connection-manager.ts @@ -1,5 +1,4 @@ import type { Postgres } from "@query-doctor/core"; -import { SegmentedQueryCache } from "./seen-cache.ts"; import { Connectable } from "./connectable.ts"; import { PostgresConnector } from "./pg-connector.ts"; import { connectToOptimizer, connectToSource } from "../sql/postgresjs.ts"; @@ -8,8 +7,6 @@ import { connectToOptimizer, connectToSource } from "../sql/postgresjs.ts"; * Manages connections and query caches for each connection */ export class ConnectionManager { - readonly segmentedQueryCache = new SegmentedQueryCache(); - // This prevents connections being garbage collected. // ConnectionMap should be responsible for closing connections private readonly connections = new Map(); @@ -48,7 +45,7 @@ export class ConnectionManager { const sql = input instanceof Connectable ? this.getOrCreateConnection(input) : input; - return new PostgresConnector(sql, this.segmentedQueryCache); + return new PostgresConnector(sql); } async close(connectable: Connectable): Promise { diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index 40537375..a125a9cc 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -17,7 +17,7 @@ import { PostgresQueryBuilder, dumpQueriesSql, } from "@query-doctor/core"; -import { SegmentedQueryCache } from "./seen-cache.ts"; +import { syncQueries } from "./query-sync.ts"; import { FullSchema, FullSchemaColumn } from "@query-doctor/core"; import { ExtensionNotInstalledError, PostgresError } from "./errors.ts"; import { RawRecentQuery, RecentQuery } from "../sql/recent-query.ts"; @@ -96,7 +96,6 @@ export class PostgresConnector implements DatabaseConnector, Rece private static readonly MIN_SIZE_FOR_TABLESAMPLE = 10_000; constructor( private readonly db: Postgres, - private readonly segmentedQueryCache: SegmentedQueryCache, ) { } async onStartAnalyze(): Promise { @@ -494,7 +493,7 @@ ORDER BY source.extensionName.toString() as "pg_stat_statements" | "pg_stat_monitor", ); const results = await this.db.exec(sql); - return await this.segmentedQueryCache.sync(this.db, results); + return await syncQueries(results); } } catch (err) { if ( @@ -526,8 +525,6 @@ ORDER BY SELECT ${source.schema}.pg_stat_monitor_reset(); -- @qd_introspection `); } - - this.segmentedQueryCache.reset(this.db); } catch (err) { if ( err instanceof Error && diff --git a/src/sync/seen-cache.test.ts b/src/sync/query-sync.test.ts similarity index 85% rename from src/sync/seen-cache.test.ts rename to src/sync/query-sync.test.ts index 741c7c09..71410a15 100644 --- a/src/sync/seen-cache.test.ts +++ b/src/sync/query-sync.test.ts @@ -1,5 +1,5 @@ import { test, expect, vi } from "vitest"; -import { QueryCache } from "./seen-cache.ts"; +import { syncQueries } from "./query-sync.ts"; import type { RawRecentQuery } from "../sql/recent-query.ts"; function makeRawQuery(query: string): RawRecentQuery { @@ -15,32 +15,26 @@ function makeRawQuery(query: string): RawRecentQuery { } test("sync skips unparseable queries and returns parseable ones", async () => { - const cache = new QueryCache(); - const validQuery = makeRawQuery("SELECT 1"); // DEALLOCATE $1 is a utility statement that the pg parser rejects const invalidQuery = makeRawQuery("DEALLOCATE $1"); - const results = await cache.sync([validQuery, invalidQuery]); + const results = await syncQueries([validQuery, invalidQuery]); expect(results).toHaveLength(1); expect(results[0].query).toContain("SELECT"); }); test("sync returns empty array when all queries fail", async () => { - const cache = new QueryCache(); - const invalidQuery1 = makeRawQuery("DEALLOCATE $1"); const invalidQuery2 = makeRawQuery("not valid sql !!!"); - const results = await cache.sync([invalidQuery1, invalidQuery2]); + const results = await syncQueries([invalidQuery1, invalidQuery2]); expect(results).toHaveLength(0); }); test("sync bounds concurrency to MAX_CONCURRENCY", async () => { - const cache = new QueryCache(); - let peakConcurrent = 0; let currentConcurrent = 0; @@ -61,7 +55,7 @@ test("sync bounds concurrency to MAX_CONCURRENCY", async () => { makeRawQuery(`SELECT ${i + 1}`), ); - await cache.sync(queries); + await syncQueries(queries); expect(peakConcurrent).toBeLessThanOrEqual(10); expect(peakConcurrent).toBeGreaterThan(1); // verify some concurrency exists diff --git a/src/sync/query-sync.ts b/src/sync/query-sync.ts new file mode 100644 index 00000000..1ab51641 --- /dev/null +++ b/src/sync/query-sync.ts @@ -0,0 +1,42 @@ +import { normalizedFingerprint } from "@query-doctor/core"; +import { + QueryHash, + RawRecentQuery, + RecentQuery, +} from "../sql/recent-query.ts"; +import { fingerprint, parse } from "@libpg-query/parser"; +import { Sema } from "async-sema"; +import { log } from "../log.ts"; + +const MAX_CONCURRENCY = 10; + +async function hash(query: string): Promise { + return QueryHash.parse(await fingerprint(query)); +} + +export async function syncQueries( + rawQueries: RawRecentQuery[], +): Promise { + const sema = new Sema(MAX_CONCURRENCY); + const results = await Promise.allSettled(rawQueries.map(async (rawQuery) => { + await sema.acquire(); + try { + const key = await hash(rawQuery.query); + const normalizedHash = QueryHash.parse( + await normalizedFingerprint(await parse(rawQuery.query), fingerprint), + ); + return await RecentQuery.analyze(rawQuery, key, normalizedHash); + } catch (error) { + log.error(`Failed to analyze query ${rawQuery.query}`, "query-sync"); + console.error(error); + throw error; + } finally { + sema.release(); + } + })); + return results + .filter((r): r is PromiseFulfilledResult => + r.status === "fulfilled" + ) + .map((r) => r.value); +} diff --git a/src/sync/seen-cache.ts b/src/sync/seen-cache.ts deleted file mode 100644 index c92adc04..00000000 --- a/src/sync/seen-cache.ts +++ /dev/null @@ -1,126 +0,0 @@ -import { normalizedFingerprint, type Postgres } from "@query-doctor/core"; -import { - QueryHash, - RawRecentQuery, - RecentQuery, -} from "../sql/recent-query.ts"; -import { fingerprint, parse } from "@libpg-query/parser"; -import { Sema } from "async-sema"; -import { log } from "../log.ts"; - -interface CacheEntry { - firstSeen: number; - lastSeen: number; -} - -export class QueryCache { - private list: Record = {}; - private readonly createdAt: number; - - constructor() { - this.createdAt = Date.now(); - } - - isCached(key: QueryHash): boolean { - const entry = this.list[key]; - if (!entry) { - return false; - } - return true; - } - - isNew(key: QueryHash): boolean { - const entry = this.list[key]; - if (!entry) { - return true; - } - return entry.firstSeen >= this.createdAt; - } - - async store(recentQuery: RawRecentQuery): Promise { - const key = await this.hash(recentQuery.query); - const now = Date.now(); - if (this.list[key]) { - this.list[key].lastSeen = now; - } else { - this.list[key] = { firstSeen: now, lastSeen: now }; - } - return key; - } - - getFirstSeen(key: QueryHash): number { - return this.list[key]?.firstSeen || Date.now(); - } - - private static readonly MAX_CONCURRENCY = 10; - - async sync(rawQueries: RawRecentQuery[]): Promise { - const sema = new Sema(QueryCache.MAX_CONCURRENCY); - const results = await Promise.allSettled(rawQueries.map(async (rawQuery) => { - await sema.acquire(); - try { - const key = await this.store(rawQuery); - const normalizedHash = QueryHash.parse( - await normalizedFingerprint(await parse(rawQuery.query), fingerprint), - ); - return await RecentQuery.analyze( - rawQuery, - key, - normalizedHash, - this.getFirstSeen(key), - ); - } catch (error) { - log.error(`Failed to analyze query ${rawQuery.query}`, "query-cache") - console.error(error) - throw error; - } finally { - sema.release(); - } - })); - return results - .filter((r): r is PromiseFulfilledResult => r.status === "fulfilled") - .map((r) => r.value); - } - - reset(): void { - this.list = {}; - } - - private async hash(query: string): Promise { - return QueryHash.parse(await fingerprint(query)); - } -} - -/** - * A top-level cache that segments queries by the db instance they're associated with - */ -export class SegmentedQueryCache { - // weak reference to the db instance to allow cache to be garbage collected - // when the connection to the database is closed. - // Can be relevant for - private readonly dbs: WeakMap = new WeakMap(); - - sync(db: Postgres, queries: RawRecentQuery[]): Promise { - const cache = this.getOrCreateCache(db); - return cache.sync(queries); - } - - store(db: Postgres, query: RawRecentQuery) { - const cache = this.getOrCreateCache(db); - return cache.store(query); - } - - reset(db: Postgres) { - const cache = this.getOrCreateCache(db); - return cache.reset(); - } - - private getOrCreateCache(db: Postgres): QueryCache { - let cache = this.dbs.get(db); - if (!cache) { - cache = new QueryCache(); - this.dbs.set(db, cache); - } - return cache; - } -}