Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/remote/query-optimizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Connectable } from "../sync/connectable.ts";
import { setTimeout } from "node:timers/promises";
import { assertArrayIncludes } from "@std/assert/array-includes";
import { assert, assertGreater } from "@std/assert";
import { RecentQuery, type OptimizedQuery } from "../sql/recent-query.ts";
import { type OptimizedQuery, RecentQuery } from "../sql/recent-query.ts";

Deno.test({
name: "controller syncs correctly",
Expand Down Expand Up @@ -48,7 +48,8 @@ Deno.test({
.start();

const manager = ConnectionManager.forLocalDatabase();
const optimizer = new QueryOptimizer(manager);
const conn = Connectable.fromString(pg.getConnectionUri());
const optimizer = new QueryOptimizer(manager, conn);

const expectedImprovements = ["select * from testing where a = $1"];
const expectedNoImprovements = [
Expand All @@ -71,11 +72,10 @@ Deno.test({
noImprovements.push(query.query);
});

const conn = Connectable.fromString(pg.getConnectionUri());
const connector = manager.getConnectorFor(conn);
try {
const recentQueries = await connector.getRecentQueries();
const includedQueries = await optimizer.start(conn, recentQueries, {
const includedQueries = await optimizer.start(recentQueries, {
kind: "fromStatisticsExport",
source: { kind: "inline" },
stats: [{
Expand Down Expand Up @@ -149,7 +149,7 @@ Deno.test({
assertArrayIncludes(expectedNoImprovements, noImprovements);
console.log("improvements 1", improvements);
console.log("no improvements 1", noImprovements);
await optimizer.start(conn, recentQueries, {
await optimizer.start(recentQueries, {
kind: "fromStatisticsExport",
source: { kind: "inline" },
stats: [{
Expand Down Expand Up @@ -187,7 +187,9 @@ Deno.test({
sanitizeOps: false,
sanitizeResources: false,
fn: async () => {
const pg = await new PostgreSqlContainer("timescale/timescaledb:latest-pg16")
const pg = await new PostgreSqlContainer(
"timescale/timescaledb:latest-pg16",
)
.withCopyContentToContainer([
{
content: `
Expand Down Expand Up @@ -248,7 +250,8 @@ Deno.test({
.start();

const manager = ConnectionManager.forLocalDatabase();
const optimizer = new QueryOptimizer(manager);
const conn = Connectable.fromString(pg.getConnectionUri());
const optimizer = new QueryOptimizer(manager, conn);

const improvementsWithRecommendations: OptimizedQuery[] = [];

Expand All @@ -259,12 +262,11 @@ Deno.test({
improvementsWithRecommendations.push(query);
});

const conn = Connectable.fromString(pg.getConnectionUri());
const connector = manager.getConnectorFor(conn);
try {
const recentQueries = await connector.getRecentQueries();

await optimizer.start(conn, recentQueries, {
await optimizer.start(recentQueries, {
kind: "fromStatisticsExport",
source: { kind: "inline" },
stats: [
Expand Down
27 changes: 23 additions & 4 deletions src/remote/query-optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type EventMap = {
zeroCostPlan: [OptimizedQuery];
noImprovements: [OptimizedQuery];
improvementsAvailable: [OptimizedQuery];
vacuumStart: [];
vacuumEnd: [];
};

type Target = {
connectable: Connectable;
optimizer: IndexOptimizer;
statistics: Statistics;
};
Expand All @@ -51,8 +52,12 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
private _allQueries = 0;
private running = false;

private queriedSinceVacuum = 0;
private static readonly vacuumThreshold = 5;

constructor(
private readonly manager: ConnectionManager,
private readonly connectable: Connectable,
) {
super();
}
Expand Down Expand Up @@ -84,14 +89,13 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
* Resolves when all queries are optimized
*/
async start(
conn: Connectable,
allRecentQueries: RecentQuery[],
statsMode: StatisticsMode = QueryOptimizer.defaultStatistics,
): Promise<OptimizedQuery[]> {
this.stop();
const validQueries = this.appendQueries(allRecentQueries);
const version = PostgresVersion.parse("17");
const pg = this.manager.getOrCreateConnection(conn);
const pg = this.manager.getOrCreateConnection(this.connectable);
const ownStats = await Statistics.dumpStats(pg, version, "full");
const statistics = new Statistics(
pg,
Expand All @@ -105,7 +109,7 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
// so traces have to be disabled
trace: false,
});
this.target = { connectable: conn, optimizer, statistics };
this.target = { optimizer, statistics };

this._allQueries = this.queries.size;
await this.work();
Expand Down Expand Up @@ -206,6 +210,11 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
optimized,
this.target,
);
this.queriedSinceVacuum++;
if (this.queriedSinceVacuum > QueryOptimizer.vacuumThreshold) {
await this.vacuum();
this.queriedSinceVacuum = 0;
}

this.queries.set(
optimized.hash,
Expand All @@ -224,6 +233,16 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
return Array.from(this.queries.values());
}

private async vacuum() {
const connector = this.manager.getConnectorFor(this.connectable);
try {
this.emit("vacuumStart");
await connector.vacuum();
} finally {
this.emit("vacuumEnd");
}
}

private checkQueryUnsupported(
query: RecentQuery,
): { type: "ok" } | { type: "ignored" } | {
Expand Down
4 changes: 2 additions & 2 deletions src/remote/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class Remote extends EventEmitter<RemoteEvents> {
super();
this.baseDbURL = targetURL.withDatabaseName(Remote.baseDbName);
this.optimizingDbUDRL = targetURL.withDatabaseName(Remote.optimizingDbName);
this.optimizer = new QueryOptimizer(manager);
this.optimizer = new QueryOptimizer(manager, this.optimizingDbUDRL);
}

async syncFrom(
Expand Down Expand Up @@ -291,7 +291,7 @@ export class Remote extends EventEmitter<RemoteEvents> {
await postgres.exec("drop schema if exists extensions cascade");
}
this.startQueryLoader(source);
this.optimizer.start(this.optimizingDbUDRL, recentQueries, stats);
this.optimizer.start(recentQueries, stats);
}

private startQueryLoader(source: Connectable) {
Expand Down
17 changes: 16 additions & 1 deletion src/sync/pg-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import type {
import { log } from "../log.ts";
import { shutdownController } from "../shutdown.ts";
import { withSpan } from "../otel.ts";
import { PgIdentifier, Postgres } from "@query-doctor/core";
import {
PgIdentifier,
Postgres,
PostgresQueryBuilder,
} from "@query-doctor/core";
import { SegmentedQueryCache } from "./seen-cache.ts";
import { FullSchema, FullSchemaColumn } from "./schema_differ.ts";
import { ExtensionNotInstalledError, PostgresError } from "./errors.ts";
Expand Down Expand Up @@ -521,6 +525,17 @@ ORDER BY
}
}

public async vacuum(): Promise<void> {
const vacuumAnalyze = new PostgresQueryBuilder("vacuum analyze")
.introspect()
.build();
try {
await this.db.exec(vacuumAnalyze);
} catch (err) {
throw new PostgresError(err instanceof Error ? err.message : String(err));
}
}

public async checkPrivilege(): Promise<{
username: string;
isSuperuser: boolean;
Expand Down