diff --git a/src/remote/query-optimizer.test.ts b/src/remote/query-optimizer.test.ts index e67c41e..59bbff1 100644 --- a/src/remote/query-optimizer.test.ts +++ b/src/remote/query-optimizer.test.ts @@ -5,7 +5,7 @@ import { Connectable } from "../sync/connectable.ts"; import { setTimeout } from "node:timers/promises"; import { assertArrayIncludes } from "@std/assert/array-includes"; import { assert, assertGreater } from "@std/assert"; -import { RecentQuery, type OptimizedQuery } from "../sql/recent-query.ts"; +import { type OptimizedQuery, RecentQuery } from "../sql/recent-query.ts"; Deno.test({ name: "controller syncs correctly", @@ -48,7 +48,8 @@ Deno.test({ .start(); const manager = ConnectionManager.forLocalDatabase(); - const optimizer = new QueryOptimizer(manager); + const conn = Connectable.fromString(pg.getConnectionUri()); + const optimizer = new QueryOptimizer(manager, conn); const expectedImprovements = ["select * from testing where a = $1"]; const expectedNoImprovements = [ @@ -71,11 +72,10 @@ Deno.test({ noImprovements.push(query.query); }); - const conn = Connectable.fromString(pg.getConnectionUri()); const connector = manager.getConnectorFor(conn); try { const recentQueries = await connector.getRecentQueries(); - const includedQueries = await optimizer.start(conn, recentQueries, { + const includedQueries = await optimizer.start(recentQueries, { kind: "fromStatisticsExport", source: { kind: "inline" }, stats: [{ @@ -149,7 +149,7 @@ Deno.test({ assertArrayIncludes(expectedNoImprovements, noImprovements); console.log("improvements 1", improvements); console.log("no improvements 1", noImprovements); - await optimizer.start(conn, recentQueries, { + await optimizer.start(recentQueries, { kind: "fromStatisticsExport", source: { kind: "inline" }, stats: [{ @@ -187,7 +187,9 @@ Deno.test({ sanitizeOps: false, sanitizeResources: false, fn: async () => { - const pg = await new PostgreSqlContainer("timescale/timescaledb:latest-pg16") + const pg = await new PostgreSqlContainer( + "timescale/timescaledb:latest-pg16", + ) .withCopyContentToContainer([ { content: ` @@ -248,7 +250,8 @@ Deno.test({ .start(); const manager = ConnectionManager.forLocalDatabase(); - const optimizer = new QueryOptimizer(manager); + const conn = Connectable.fromString(pg.getConnectionUri()); + const optimizer = new QueryOptimizer(manager, conn); const improvementsWithRecommendations: OptimizedQuery[] = []; @@ -259,12 +262,11 @@ Deno.test({ improvementsWithRecommendations.push(query); }); - const conn = Connectable.fromString(pg.getConnectionUri()); const connector = manager.getConnectorFor(conn); try { const recentQueries = await connector.getRecentQueries(); - await optimizer.start(conn, recentQueries, { + await optimizer.start(recentQueries, { kind: "fromStatisticsExport", source: { kind: "inline" }, stats: [ diff --git a/src/remote/query-optimizer.ts b/src/remote/query-optimizer.ts index f1bf79b..55dc86c 100644 --- a/src/remote/query-optimizer.ts +++ b/src/remote/query-optimizer.ts @@ -26,10 +26,11 @@ type EventMap = { zeroCostPlan: [OptimizedQuery]; noImprovements: [OptimizedQuery]; improvementsAvailable: [OptimizedQuery]; + vacuumStart: []; + vacuumEnd: []; }; type Target = { - connectable: Connectable; optimizer: IndexOptimizer; statistics: Statistics; }; @@ -51,8 +52,12 @@ export class QueryOptimizer extends EventEmitter { private _allQueries = 0; private running = false; + private queriedSinceVacuum = 0; + private static readonly vacuumThreshold = 5; + constructor( private readonly manager: ConnectionManager, + private readonly connectable: Connectable, ) { super(); } @@ -84,14 +89,13 @@ export class QueryOptimizer extends EventEmitter { * Resolves when all queries are optimized */ async start( - conn: Connectable, allRecentQueries: RecentQuery[], statsMode: StatisticsMode = QueryOptimizer.defaultStatistics, ): Promise { this.stop(); const validQueries = this.appendQueries(allRecentQueries); const version = PostgresVersion.parse("17"); - const pg = this.manager.getOrCreateConnection(conn); + const pg = this.manager.getOrCreateConnection(this.connectable); const ownStats = await Statistics.dumpStats(pg, version, "full"); const statistics = new Statistics( pg, @@ -105,7 +109,7 @@ export class QueryOptimizer extends EventEmitter { // so traces have to be disabled trace: false, }); - this.target = { connectable: conn, optimizer, statistics }; + this.target = { optimizer, statistics }; this._allQueries = this.queries.size; await this.work(); @@ -206,6 +210,11 @@ export class QueryOptimizer extends EventEmitter { optimized, this.target, ); + this.queriedSinceVacuum++; + if (this.queriedSinceVacuum > QueryOptimizer.vacuumThreshold) { + await this.vacuum(); + this.queriedSinceVacuum = 0; + } this.queries.set( optimized.hash, @@ -224,6 +233,16 @@ export class QueryOptimizer extends EventEmitter { return Array.from(this.queries.values()); } + private async vacuum() { + const connector = this.manager.getConnectorFor(this.connectable); + try { + this.emit("vacuumStart"); + await connector.vacuum(); + } finally { + this.emit("vacuumEnd"); + } + } + private checkQueryUnsupported( query: RecentQuery, ): { type: "ok" } | { type: "ignored" } | { diff --git a/src/remote/remote.ts b/src/remote/remote.ts index 477a10c..ba860e3 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -70,7 +70,7 @@ export class Remote extends EventEmitter { super(); this.baseDbURL = targetURL.withDatabaseName(Remote.baseDbName); this.optimizingDbUDRL = targetURL.withDatabaseName(Remote.optimizingDbName); - this.optimizer = new QueryOptimizer(manager); + this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL); } async syncFrom( @@ -291,7 +291,7 @@ export class Remote extends EventEmitter { await postgres.exec("drop schema if exists extensions cascade"); } this.startQueryLoader(source); - this.optimizer.start(this.optimizingDbUDRL, recentQueries, stats); + this.optimizer.start(recentQueries, stats); } private startQueryLoader(source: Connectable) { diff --git a/src/sync/pg-connector.ts b/src/sync/pg-connector.ts index 2b6770b..193bfcb 100644 --- a/src/sync/pg-connector.ts +++ b/src/sync/pg-connector.ts @@ -12,7 +12,11 @@ import type { import { log } from "../log.ts"; import { shutdownController } from "../shutdown.ts"; import { withSpan } from "../otel.ts"; -import { PgIdentifier, Postgres } from "@query-doctor/core"; +import { + PgIdentifier, + Postgres, + PostgresQueryBuilder, +} from "@query-doctor/core"; import { SegmentedQueryCache } from "./seen-cache.ts"; import { FullSchema, FullSchemaColumn } from "./schema_differ.ts"; import { ExtensionNotInstalledError, PostgresError } from "./errors.ts"; @@ -521,6 +525,17 @@ ORDER BY } } + public async vacuum(): Promise { + const vacuumAnalyze = new PostgresQueryBuilder("vacuum analyze") + .introspect() + .build(); + try { + await this.db.exec(vacuumAnalyze); + } catch (err) { + throw new PostgresError(err instanceof Error ? err.message : String(err)); + } + } + public async checkPrivilege(): Promise<{ username: string; isSuperuser: boolean;