Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"@libpg-query/parser": "^17.6.3",
"@opentelemetry/api": "^1.9.0",
"@pgsql/types": "^17.6.2",
"@query-doctor/core": "^0.10.5",
"@query-doctor/core": "^0.10.6",
"async-sema": "^3.1.1",
"capnweb": "^0.7.0",
"dedent": "^1.7.1",
Expand Down
4 changes: 2 additions & 2 deletions src/remote/api-client.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { newWebSocketRpcSession, RpcTarget } from "capnweb";
import type { RpcStub } from "capnweb";
import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi, RecentQuery } from "@query-doctor/core";
import type { ConnectionMode, UnauthenticatedServerApi, ClientApi, IndexDefinition, ServerApi } from "@query-doctor/core";
import type { ExportedStats } from "@query-doctor/core";
import { PgIdentifier, Statistics } from "@query-doctor/core";
import { log } from "../log.ts";
import type { Remote } from "./remote.ts";
import type { OptimizedQuery } from "../sql/recent-query.ts";
import type { OptimizedQuery, RecentQuery } from "../sql/recent-query.ts";

export function hookUpApiReporter(api: RpcStub<ServerApi>, remote: Remote): () => void {
const onExtensionPresenceChanged = (presence: Parameters<typeof api.setExtensionPresence>[0]) => {
Expand Down
1 change: 0 additions & 1 deletion src/remote/query-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ function createMockRecentQuery(query: string): RecentQuery {
nudges: [],
hash: "test_hash" as QueryHash,
normalizedHash: "test_normalized_hash" as QueryHash,
seenAt: Date.now(),
optimization: { state: "waiting" },
withOptimization: function () {
return this as OptimizedQuery;
Expand Down
1 change: 0 additions & 1 deletion src/remote/query-optimizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ test("controller syncs correctly", async () => {
[],
0 as any,
0 as any,
1,
),
]);
expect(
Expand Down
5 changes: 2 additions & 3 deletions src/sql/pgbadger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { statSync } from "node:fs";
import csv from "fast-csv";
import type { RawRecentQuery, RecentQuery } from "./recent-query.ts";
import { preprocessEncodedJson } from "./json.ts";
import { QueryCache } from "../sync/seen-cache.ts";
import { syncQueries } from "../sync/query-sync.ts";
import type { RecentQuerySource } from "./recent-query.ts";

const INTROSPECTION_MARKER = "@qd_introspection";
Expand Down Expand Up @@ -60,7 +60,6 @@ export class PgbadgerSource implements RecentQuerySource {

constructor(
private readonly logPath: string,
private readonly cache: QueryCache = new QueryCache(),
) {
this.logSize = statSync(this.logPath).size;
console.log(`logPath=${this.logPath},fileSize=${this.logSize}`);
Expand Down Expand Up @@ -95,6 +94,6 @@ export class PgbadgerSource implements RecentQuerySource {
this.totalRows++;
}
console.log("Finished pgbadger stream");
return this.cache.sync(rawQueries);
return syncQueries(rawQueries);
}
}
35 changes: 16 additions & 19 deletions src/sql/recent-query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ test("isTargetlessSelectQuery returns false when table references exist", () =>

test("constructor sets derived boolean properties correctly for a SELECT on user tables", () => {
const refs: TableReference[] = [{ table: "users", schema: "public" }];
const rq = new RecentQuery(makeRawQuery(), refs, [], [], [], testHash, testNormalizedHash, 1000);
const rq = new RecentQuery(makeRawQuery(), refs, [], [], [], testHash, testNormalizedHash);
expect(rq.isSelectQuery).toBe(true);
expect(rq.isSystemQuery).toBe(false);
expect(rq.isIntrospection).toBe(false);
expect(rq.isTargetlessSelectQuery).toBe(false);
});

test("constructor sets isTargetlessSelectQuery=true for SELECT with no table refs", () => {
const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash, 1000);
const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash);
expect(rq.isSelectQuery).toBe(true);
expect(rq.isTargetlessSelectQuery).toBe(true);
});
Expand All @@ -160,7 +160,6 @@ test("constructor sets isTargetlessSelectQuery=false for non-SELECT even with em
[],
testHash,
testNormalizedHash,
1000,
);
expect(rq.isSelectQuery).toBe(false);
expect(rq.isTargetlessSelectQuery).toBe(false);
Expand All @@ -176,7 +175,7 @@ test("constructor copies all data fields from RawRecentQuery", () => {
rows: "0",
topLevel: false,
});
const rq = new RecentQuery(data, [], [], [], [], testHash, testNormalizedHash, 1000);
const rq = new RecentQuery(data, [], [], [], [], testHash, testNormalizedHash);
expect(rq.username).toBe("admin");
expect(rq.query).toBe("SELECT 1");
expect(rq.formattedQuery).toBe("SELECT\n 1");
Expand All @@ -185,13 +184,12 @@ test("constructor copies all data fields from RawRecentQuery", () => {
expect(rq.rows).toBe("0");
expect(rq.topLevel).toBe(false);
expect(rq.hash).toBe(testHash);
expect(rq.seenAt).toBe(1000);
});

// --- withOptimization ---

test("withOptimization attaches optimization to the instance", () => {
const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash, 1000);
const rq = new RecentQuery(makeRawQuery(), [], [], [], [], testHash, testNormalizedHash);
const optimization = { plan: "mock plan" } as any;
const optimized = rq.withOptimization(optimization);
expect(optimized.optimization).toBe(optimization);
Expand All @@ -203,10 +201,9 @@ test("withOptimization attaches optimization to the instance", () => {

test("analyze produces a RecentQuery with formatted query and analysis", async () => {
const data = makeRawQuery({ query: "SELECT id FROM users WHERE id = $1" });
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 2000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq).toBeInstanceOf(RecentQuery);
expect(rq.hash).toBe(testHash);
expect(rq.seenAt).toBe(2000);
// The formatted query should have uppercase keywords
expect(rq.formattedQuery).toMatch(/SELECT/);
// Table references should include 'users'
Expand All @@ -216,23 +213,23 @@ test("analyze produces a RecentQuery with formatted query and analysis", async (
test("analyze throws on unparseable SQL", async () => {
const data = makeRawQuery({ query: "THIS IS NOT VALID SQL AT ALL !!!" });
await expect(
RecentQuery.analyze(data, testHash, testNormalizedHash, 3000),
RecentQuery.analyze(data, testHash, testNormalizedHash),
).rejects.toThrow();
});

// --- statementType-based isSelectQuery via analyze ---

test("analyze sets isSelectQuery=true for SELECT", async () => {
const data = makeRawQuery({ query: "SELECT * FROM users" });
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.isSelectQuery).toBe(true);
});

test("analyze sets isSelectQuery=true for CTE with SELECT", async () => {
const data = makeRawQuery({
query: "WITH cte AS (SELECT id FROM users) SELECT * FROM cte",
});
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.isSelectQuery).toBe(true);
});

Expand All @@ -241,15 +238,15 @@ test("analyze sets isSelectQuery=false for UPDATE even with SELECT subquery", as
query:
'UPDATE "public"."jobs" SET "state" = $1 FROM (SELECT id FROM "public"."jobs" WHERE state = $2 LIMIT 10) AS s1 WHERE "jobs".id = s1.id',
});
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.isSelectQuery).toBe(false);
});

test("analyze sets isSelectQuery=false for INSERT ... SELECT", async () => {
const data = makeRawQuery({
query: "INSERT INTO archive SELECT * FROM users WHERE active = false",
});
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.isSelectQuery).toBe(false);
});

Expand All @@ -258,7 +255,7 @@ test("analyze sets isSelectQuery=false for DELETE with EXISTS subquery", async (
query:
"DELETE FROM users WHERE EXISTS (SELECT 1 FROM banned WHERE banned.user_id = users.id)",
});
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.isSelectQuery).toBe(false);
});

Expand All @@ -269,7 +266,7 @@ test("analyze populates displayQuery for wide SELECTs", async () => {
query:
'SELECT "u"."id", "u"."email", "u"."first_name", "u"."last_name", "u"."created_at", "u"."updated_at", "u"."stripe_customer_id" FROM "users" "u" WHERE "u"."id" = $1',
});
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
// Normalize whitespace because the analyzer prettier-formats the query
// before compacting; the site applies the same normalization on render.
const normalized = rq.displayQuery?.replace(/\s+/g, " ").trim();
Expand All @@ -280,7 +277,7 @@ test("analyze populates displayQuery for wide SELECTs", async () => {

test("analyze leaves displayQuery undefined for narrow SELECTs", async () => {
const data = makeRawQuery({ query: "SELECT id FROM users WHERE id = $1" });
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.displayQuery).toBeUndefined();
});

Expand All @@ -289,7 +286,7 @@ test("analyze leaves displayQuery undefined for non-SELECTs", async () => {
query:
"INSERT INTO archive SELECT a, b, c, d, e, f, g, h FROM users WHERE active = false",
});
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.displayQuery).toBeUndefined();
});

Expand All @@ -298,14 +295,14 @@ test("analyze leaves displayQuery undefined for UNION", async () => {
query:
"SELECT a, b, c, d, e, f, g FROM t UNION SELECT a, b, c, d, e, f, g FROM u",
});
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.displayQuery).toBeUndefined();
});


test("analyze strips sqlcommenter tags from formattedQuery", async () => {
const data = makeRawQuery({ query: "select 1 /*a='1',b='2'*/" });
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash, 1000);
const rq = await RecentQuery.analyze(data, testHash, testNormalizedHash);
expect(rq.tags).toEqual([
{ key: "a", value: "1" },
{ key: "b", value: "2" },
Expand Down
9 changes: 1 addition & 8 deletions src/sql/recent-query.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as prettier from "prettier";
import prettierPluginSql from "prettier-plugin-sql";
import type { SegmentedQueryCache } from "../sync/seen-cache.ts";
import {
Analyzer,
compactSelectList,
Expand All @@ -18,8 +17,7 @@ import { log } from "../log.ts";
import type { LiveQueryOptimization } from "../remote/optimization.ts";

/**
* Constructed by syncing with {@link SegmentedQueryCache.sync}
* and supplying the date the query was last seen
* Constructed by syncing with {@link syncQueries}
*/
export class RecentQuery {
private static HARDCODED_LIMIT = 50;
Expand Down Expand Up @@ -50,7 +48,6 @@ export class RecentQuery {
readonly nudges: Nudge[],
readonly hash: QueryHash,
readonly normalizedHash: QueryHash,
readonly seenAt: number,
analysisSkipped = false,
statementType?: StatementType,
) {
Expand Down Expand Up @@ -101,7 +98,6 @@ export class RecentQuery {
nudges: this.nudges,
hash: this.hash,
normalizedHash: this.normalizedHash,
seenAt: this.seenAt,
optimization: this.optimization,
}));
}
Expand All @@ -117,7 +113,6 @@ export class RecentQuery {
data: RawRecentQuery,
hash: QueryHash,
normalizedHash: QueryHash,
seenAt: number,
) {
if (data.query.length > RecentQuery.MAX_ANALYZABLE_QUERY_SIZE) {
return new RecentQuery(
Expand All @@ -128,7 +123,6 @@ export class RecentQuery {
[],
hash,
normalizedHash,
seenAt,
true,
);
}
Expand All @@ -149,7 +143,6 @@ export class RecentQuery {
analysis.nudges,
hash,
normalizedHash,
seenAt,
false,
analysis.statementType,
);
Expand Down
5 changes: 1 addition & 4 deletions src/sync/connection-manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Postgres } from "@query-doctor/core";
import { SegmentedQueryCache } from "./seen-cache.ts";
import { Connectable } from "./connectable.ts";
import { PostgresConnector } from "./pg-connector.ts";
import { connectToOptimizer, connectToSource } from "../sql/postgresjs.ts";
Expand All @@ -8,8 +7,6 @@ import { connectToOptimizer, connectToSource } from "../sql/postgresjs.ts";
* Manages connections and query caches for each connection
*/
export class ConnectionManager {
readonly segmentedQueryCache = new SegmentedQueryCache();

// This prevents connections being garbage collected.
// ConnectionMap should be responsible for closing connections
private readonly connections = new Map<string, Postgres>();
Expand Down Expand Up @@ -48,7 +45,7 @@ export class ConnectionManager {
const sql = input instanceof Connectable
? this.getOrCreateConnection(input)
: input;
return new PostgresConnector(sql, this.segmentedQueryCache);
return new PostgresConnector(sql);
}

async close(connectable: Connectable): Promise<void> {
Expand Down
7 changes: 2 additions & 5 deletions src/sync/pg-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
PostgresQueryBuilder,
dumpQueriesSql,
} from "@query-doctor/core";
import { SegmentedQueryCache } from "./seen-cache.ts";
import { syncQueries } from "./query-sync.ts";
import { FullSchema, FullSchemaColumn } from "@query-doctor/core";
import { ExtensionNotInstalledError, PostgresError } from "./errors.ts";
import { RawRecentQuery, RecentQuery } from "../sql/recent-query.ts";
Expand Down Expand Up @@ -96,7 +96,6 @@ export class PostgresConnector implements DatabaseConnector<PostgresTuple>, Rece
private static readonly MIN_SIZE_FOR_TABLESAMPLE = 10_000;
constructor(
private readonly db: Postgres,
private readonly segmentedQueryCache: SegmentedQueryCache,
) { }

async onStartAnalyze(): Promise<void> {
Expand Down Expand Up @@ -494,7 +493,7 @@ ORDER BY
source.extensionName.toString() as "pg_stat_statements" | "pg_stat_monitor",
);
const results = await this.db.exec<RawRecentQuery>(sql);
return await this.segmentedQueryCache.sync(this.db, results);
return await syncQueries(results);
}
} catch (err) {
if (
Expand Down Expand Up @@ -526,8 +525,6 @@ ORDER BY
SELECT ${source.schema}.pg_stat_monitor_reset(); -- @qd_introspection
`);
}

this.segmentedQueryCache.reset(this.db);
} catch (err) {
if (
err instanceof Error &&
Expand Down
14 changes: 4 additions & 10 deletions src/sync/seen-cache.test.ts → src/sync/query-sync.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { test, expect, vi } from "vitest";
import { QueryCache } from "./seen-cache.ts";
import { syncQueries } from "./query-sync.ts";
import type { RawRecentQuery } from "../sql/recent-query.ts";

function makeRawQuery(query: string): RawRecentQuery {
Expand All @@ -15,32 +15,26 @@ function makeRawQuery(query: string): RawRecentQuery {
}

test("sync skips unparseable queries and returns parseable ones", async () => {
const cache = new QueryCache();

const validQuery = makeRawQuery("SELECT 1");
// DEALLOCATE $1 is a utility statement that the pg parser rejects
const invalidQuery = makeRawQuery("DEALLOCATE $1");

const results = await cache.sync([validQuery, invalidQuery]);
const results = await syncQueries([validQuery, invalidQuery]);

expect(results).toHaveLength(1);
expect(results[0].query).toContain("SELECT");
});

test("sync returns empty array when all queries fail", async () => {
const cache = new QueryCache();

const invalidQuery1 = makeRawQuery("DEALLOCATE $1");
const invalidQuery2 = makeRawQuery("not valid sql !!!");

const results = await cache.sync([invalidQuery1, invalidQuery2]);
const results = await syncQueries([invalidQuery1, invalidQuery2]);

expect(results).toHaveLength(0);
});

test("sync bounds concurrency to MAX_CONCURRENCY", async () => {
const cache = new QueryCache();

let peakConcurrent = 0;
let currentConcurrent = 0;

Expand All @@ -61,7 +55,7 @@ test("sync bounds concurrency to MAX_CONCURRENCY", async () => {
makeRawQuery(`SELECT ${i + 1}`),
);

await cache.sync(queries);
await syncQueries(queries);

expect(peakConcurrent).toBeLessThanOrEqual(10);
expect(peakConcurrent).toBeGreaterThan(1); // verify some concurrency exists
Expand Down
Loading
Loading