diff --git a/src/remote/query-optimizer.ts b/src/remote/query-optimizer.ts index 787fbc1..35f6b53 100644 --- a/src/remote/query-optimizer.ts +++ b/src/remote/query-optimizer.ts @@ -112,6 +112,25 @@ export class QueryOptimizer extends EventEmitter { return validQueries; } + stop() { + this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY); + this.queries.clear(); + this.target = undefined; + this._finish = Promise.withResolvers(); + this._allQueries = 0; + this._invalidQueries = 0; + this._validQueriesProcessed = 0; + } + + restart() { + this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY); + this.queries.clear(); + this._finish = Promise.withResolvers(); + this._allQueries = 0; + this._invalidQueries = 0; + this._validQueriesProcessed = 0; + } + /** * Insert new queries to be processed. The {@link start} method must * have been called previously for this to take effect @@ -153,16 +172,6 @@ export class QueryOptimizer extends EventEmitter { return validQueries; } - stop() { - this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY); - this.queries.clear(); - this.target = undefined; - this._allQueries = 0; - this._finish = Promise.withResolvers(); - this._invalidQueries = 0; - this._validQueriesProcessed = 0; - } - private async work() { if (!this.target) { return; @@ -309,7 +318,12 @@ export class QueryOptimizer extends EventEmitter { Math.abs(percentageReduction), ); if (costReductionPercentage < MINIMUM_COST_CHANGE_PERCENTAGE) { - this.onNoImprovements(recent, result.baseCost, indexesUsed, explainPlan); + this.onNoImprovements( + recent, + result.baseCost, + indexesUsed, + explainPlan, + ); return { state: "no_improvement_found", cost: result.baseCost, diff --git a/src/remote/remote-controller.ts b/src/remote/remote-controller.ts index bd1ffd5..b3bc96c 100644 --- a/src/remote/remote-controller.ts +++ b/src/remote/remote-controller.ts @@ -1,8 +1,9 @@ import { env } from "../env.ts"; import { log } from "../log.ts"; -import { OptimizedQuery } from "../sql/recent-query.ts"; import { RemoteSyncRequest } from "./remote.dto.ts"; import { Remote } from "./remote.ts"; +import * as errors from "../sync/errors.ts"; +import type { OptimizedQuery } from "../sql/recent-query.ts"; const SyncStatus = { NOT_STARTED: "notStarted", @@ -42,9 +43,9 @@ export class RemoteController { } else if (request.method === "GET") { return this.getStatus(); } - } - - if (url.pathname === "/postgres/reset" && request.method === "POST") { + } else if ( + url.pathname === "/postgres/reset" && request.method === "POST" + ) { return await this.onReset(request); } } @@ -124,6 +125,12 @@ export class RemoteController { return Response.json({ success: true }); } catch (error) { console.error(error); + if (error instanceof errors.PostgresError) { + return error.toResponse(); + } + if (error instanceof errors.ExtensionNotInstalledError) { + return error.toResponse(); + } return Response.json({ error: error instanceof Error ? error.message : "Unknown error", }, { status: 500 }); diff --git a/src/remote/remote.ts b/src/remote/remote.ts index 62aa3b5..477a10c 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -274,6 +274,7 @@ export class Remote extends EventEmitter { async resetPgStatStatements(source: Connectable): Promise { const connector = this.sourceManager.getConnectorFor(source); await connector.resetPgStatStatements(); + this.optimizer.restart(); } /** diff --git a/src/server/http.ts b/src/server/http.ts index c524d08..e62a16e 100644 --- a/src/server/http.ts +++ b/src/server/http.ts @@ -7,7 +7,6 @@ import { ZodError } from "zod"; import { shutdownController } from "../shutdown.ts"; import { env } from "../env.ts"; import { SyncResult } from "../sync/syncer.ts"; -import { connectToOptimizer, connectToSource } from "../sql/postgresjs.ts"; import type { RateLimitResult } from "@rabbit-company/rate-limiter"; import * as errors from "../sync/errors.ts"; import { RemoteController } from "../remote/remote-controller.ts"; @@ -122,38 +121,6 @@ async function onSyncLiveQuery(req: Request) { } } -async function onReset(req: Request) { - let body: LiveQueryRequest; - try { - body = LiveQueryRequest.parse(await req.json()); - } catch (e: unknown) { - if (e instanceof ZodError) { - return Response.json( - { - kind: "error", - type: "invalid_body", - error: e.issues.map((issue) => issue.message).join("\n"), - }, - { status: 400 }, - ); - } - throw e; - } - try { - await syncer.reset(body.db); - return Response.json({ kind: "ok" }, { status: 500 }); - } catch (error) { - if (error instanceof errors.PostgresError) { - return error.toResponse(); - } - if (error instanceof errors.ExtensionNotInstalledError) { - return error.toResponse(); - } - - return makeUnexpectedErrorResponse(error); - } -} - export function createServer( hostname: string, port: number, @@ -217,12 +184,6 @@ export function createServer( } const res = await onSyncLiveQuery(req); return transformResponse(res, limit); - } else if (url.pathname === "/postgres/reset") { - if (req.method !== "POST") { - return new Response("Method not allowed", { status: 405 }); - } - const res = await onReset(req); - return transformResponse(res, limit); } const remoteResponse = await remoteController?.execute(req); if (remoteResponse) { diff --git a/src/sync/syncer.ts b/src/sync/syncer.ts index d9ea142..3f168f3 100644 --- a/src/sync/syncer.ts +++ b/src/sync/syncer.ts @@ -141,15 +141,4 @@ export class PostgresSyncer { const deltas = this.differ.put(connectable, schema); return { queries, deltas }; } - - /** - * @throws {ExtensionNotInstalledError} - * @throws {PostgresError} - */ - async reset( - connectable: Connectable, - ): Promise { - const connector = this.manager.getConnectorFor(connectable); - await connector.resetPgStatStatements(); - } }