Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions src/remote/query-optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,25 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
return validQueries;
}

stop() {
this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY);
this.queries.clear();
this.target = undefined;
this._finish = Promise.withResolvers();
this._allQueries = 0;
this._invalidQueries = 0;
this._validQueriesProcessed = 0;
}

restart() {
this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY);
this.queries.clear();
this._finish = Promise.withResolvers();
this._allQueries = 0;
this._invalidQueries = 0;
this._validQueriesProcessed = 0;
}

/**
* Insert new queries to be processed. The {@link start} method must
* have been called previously for this to take effect
Expand Down Expand Up @@ -153,16 +172,6 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
return validQueries;
}

stop() {
this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY);
this.queries.clear();
this.target = undefined;
this._allQueries = 0;
this._finish = Promise.withResolvers();
this._invalidQueries = 0;
this._validQueriesProcessed = 0;
}

private async work() {
if (!this.target) {
return;
Expand Down Expand Up @@ -309,7 +318,12 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
Math.abs(percentageReduction),
);
if (costReductionPercentage < MINIMUM_COST_CHANGE_PERCENTAGE) {
this.onNoImprovements(recent, result.baseCost, indexesUsed, explainPlan);
this.onNoImprovements(
recent,
result.baseCost,
indexesUsed,
explainPlan,
);
return {
state: "no_improvement_found",
cost: result.baseCost,
Expand Down
15 changes: 11 additions & 4 deletions src/remote/remote-controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { env } from "../env.ts";
import { log } from "../log.ts";
import { OptimizedQuery } from "../sql/recent-query.ts";
import { RemoteSyncRequest } from "./remote.dto.ts";
import { Remote } from "./remote.ts";
import * as errors from "../sync/errors.ts";
import type { OptimizedQuery } from "../sql/recent-query.ts";

const SyncStatus = {
NOT_STARTED: "notStarted",
Expand Down Expand Up @@ -42,9 +43,9 @@ export class RemoteController {
} else if (request.method === "GET") {
return this.getStatus();
}
}

if (url.pathname === "/postgres/reset" && request.method === "POST") {
} else if (
url.pathname === "/postgres/reset" && request.method === "POST"
) {
return await this.onReset(request);
}
}
Expand Down Expand Up @@ -124,6 +125,12 @@ export class RemoteController {
return Response.json({ success: true });
} catch (error) {
console.error(error);
if (error instanceof errors.PostgresError) {
return error.toResponse();
}
if (error instanceof errors.ExtensionNotInstalledError) {
return error.toResponse();
}
return Response.json({
error: error instanceof Error ? error.message : "Unknown error",
}, { status: 500 });
Expand Down
1 change: 1 addition & 0 deletions src/remote/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ export class Remote extends EventEmitter<RemoteEvents> {
async resetPgStatStatements(source: Connectable): Promise<void> {
const connector = this.sourceManager.getConnectorFor(source);
await connector.resetPgStatStatements();
this.optimizer.restart();
}

/**
Expand Down
39 changes: 0 additions & 39 deletions src/server/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { ZodError } from "zod";
import { shutdownController } from "../shutdown.ts";
import { env } from "../env.ts";
import { SyncResult } from "../sync/syncer.ts";
import { connectToOptimizer, connectToSource } from "../sql/postgresjs.ts";
import type { RateLimitResult } from "@rabbit-company/rate-limiter";
import * as errors from "../sync/errors.ts";
import { RemoteController } from "../remote/remote-controller.ts";
Expand Down Expand Up @@ -122,38 +121,6 @@ async function onSyncLiveQuery(req: Request) {
}
}

async function onReset(req: Request) {
let body: LiveQueryRequest;
try {
body = LiveQueryRequest.parse(await req.json());
} catch (e: unknown) {
if (e instanceof ZodError) {
return Response.json(
{
kind: "error",
type: "invalid_body",
error: e.issues.map((issue) => issue.message).join("\n"),
},
{ status: 400 },
);
}
throw e;
}
try {
await syncer.reset(body.db);
return Response.json({ kind: "ok" }, { status: 500 });
} catch (error) {
if (error instanceof errors.PostgresError) {
return error.toResponse();
}
if (error instanceof errors.ExtensionNotInstalledError) {
return error.toResponse();
}

return makeUnexpectedErrorResponse(error);
}
}

export function createServer(
hostname: string,
port: number,
Expand Down Expand Up @@ -217,12 +184,6 @@ export function createServer(
}
const res = await onSyncLiveQuery(req);
return transformResponse(res, limit);
} else if (url.pathname === "/postgres/reset") {
if (req.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
}
const res = await onReset(req);
return transformResponse(res, limit);
}
const remoteResponse = await remoteController?.execute(req);
if (remoteResponse) {
Expand Down
11 changes: 0 additions & 11 deletions src/sync/syncer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,4 @@ export class PostgresSyncer {
const deltas = this.differ.put(connectable, schema);
return { queries, deltas };
}

/**
* @throws {ExtensionNotInstalledError}
* @throws {PostgresError}
*/
async reset(
connectable: Connectable,
): Promise<void> {
const connector = this.manager.getConnectorFor(connectable);
await connector.resetPgStatStatements();
}
}