From 4638e7886b4c900d9db206d39137b4553017d6dc Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Fri, 1 Nov 2024 17:44:35 -0700 Subject: [PATCH] chore: directly retrieve connections from ICP when using the ICP --- .../plugins/read_write_splitting_plugin.ts | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/common/lib/plugins/read_write_splitting_plugin.ts b/common/lib/plugins/read_write_splitting_plugin.ts index 6a5fa789..2ace0bd1 100644 --- a/common/lib/plugins/read_write_splitting_plugin.ts +++ b/common/lib/plugins/read_write_splitting_plugin.ts @@ -198,8 +198,21 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement try { const targetClient = await this.pluginService.connect(writerHost, props); this.isWriterClientFromInternalPool = this.pluginService.getConnectionProvider(writerHost, props) instanceof InternalPooledConnectionProvider; - this.setWriterClient(targetClient, writerHost); - await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); + if (!this.isWriterClientFromInternalPool) { + this.setWriterClient(targetClient, writerHost); + } else { + // Do not cache clients from internal pools. + logger.debug(Messages.get("ReadWriteSplittingPlugin.setWriterClient", writerHost.url)); + // Verify clients from internal pools. + if (!(await this.isTargetClientUsable(targetClient))) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.failedToConnectToWriter", writerHost.url)); + } + if (this.isReaderClientFromInternalPool) { + // Release the previous pooled connection back to the pool. + await this.closeTargetClientIfIdle(this.pluginService.getCurrentClient().targetClient); + } + } + await this.switchCurrentTargetClientTo(targetClient, writerHost); } catch (any) { logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToWriter", writerHost.url)); } @@ -303,8 +316,22 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement return; } logger.debug(Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader", readerHost.url)); - this.setReaderClient(targetClient, readerHost); - await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); + + if (!this.isReaderClientFromInternalPool) { + this.setReaderClient(targetClient, readerHost); + } else { + // Do not cache clients from internal pools. + logger.debug(Messages.get("ReadWriteSplittingPlugin.setReaderClient", readerHost.url)); + // Verify clients from internal pools. + if (!(await this.isTargetClientUsable(targetClient))) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.failedToConnectToReader", readerHost.url)); + } + if (this.isWriterClientFromInternalPool) { + // Release the previous pooled connection back to the pool. + await this.closeTargetClientIfIdle(this.pluginService.getCurrentClient().targetClient); + } + } + await this.switchCurrentTargetClientTo(targetClient, readerHost); } async switchToWriterTargetClient(hosts: HostInfo[]) { @@ -318,16 +345,13 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement if (!writerHost) { return; } - if (!(await this.isTargetClientUsable(this.writerTargetClient))) { + if (!this.writerTargetClient || this.isWriterClientFromInternalPool) { await this.getNewWriterClient(writerHost); - } else if (this.writerTargetClient) { + } else if (await this.isTargetClientUsable(this.writerTargetClient)) { await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); } logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", writerHost.url)); - if (this.isReaderClientFromInternalPool) { - await this.closeTargetClientIfIdle(this.readerTargetClient); - } } async switchToReaderTargetClient(hosts: HostInfo[]) { @@ -338,23 +362,20 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement } this._inReadWriteSplit = true; - if (!(await this.isTargetClientUsable(this.readerTargetClient))) { + if (!this.readerTargetClient || this.isReaderClientFromInternalPool) { await this.initializeReaderClient(hosts); - } else if (this.readerTargetClient != null && this._readerHostInfo != null) { + } else if (await this.isTargetClientUsable(this.readerTargetClient)) { try { await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); - logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this._readerHostInfo.url)); + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this._readerHostInfo?.url ?? "")); } catch (error: any) { - logger.debug(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", this._readerHostInfo.url)); + logger.debug(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", this._readerHostInfo?.url ?? "")); await this.pluginService.abortTargetClient(this.readerTargetClient); this.readerTargetClient = undefined; this._readerHostInfo = undefined; await this.initializeReaderClient(hosts); } } - if (this.isWriterClientFromInternalPool) { - await this.closeTargetClientIfIdle(this.writerTargetClient); - } } async isTargetClientUsable(targetClient: ClientWrapper | undefined): Promise {