Skip to content

Commit 8b9ef4f

Browse files
committed
More stable abort handling.
1 parent b3a23ef commit 8b9ef4f

File tree

2 files changed

+16
-10
lines changed

2 files changed

+16
-10
lines changed

modules/module-postgres/src/replication/PostgresSnapshotter.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export class PostgresSnapshotter {
7070

7171
private connections: PgManager;
7272

73-
private abort_signal: AbortSignal;
73+
private abortSignal: AbortSignal;
7474

7575
private snapshotChunkLength: number;
7676

@@ -94,7 +94,7 @@ export class PostgresSnapshotter {
9494
this.connections = options.connections;
9595
this.snapshotChunkLength = options.snapshotChunkLength ?? 10_000;
9696

97-
this.abort_signal = options.abort_signal;
97+
this.abortSignal = options.abort_signal;
9898
}
9999

100100
async getQualifiedTableNames(
@@ -295,7 +295,7 @@ export class PostgresSnapshotter {
295295
// In those cases, we have to start replication from scratch.
296296
// If there is an existing healthy slot, we can skip this and continue
297297
// initial replication where we left off.
298-
await this.storage.clear({ signal: this.abort_signal });
298+
await this.storage.clear({ signal: this.abortSignal });
299299

300300
await db.query({
301301
statement: 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1',
@@ -349,11 +349,11 @@ export class PostgresSnapshotter {
349349
// Special case where we start with no tables to snapshot
350350
await this.markSnapshotDone();
351351
}
352-
while (!this.abort_signal.aborted) {
352+
while (!this.abortSignal.aborted) {
353353
const table = this.queue.values().next().value;
354354
if (table == null) {
355355
this.initialSnapshotDone.resolve();
356-
await timers.setTimeout(500, { signal: this.abort_signal });
356+
await timers.setTimeout(500, { signal: this.abortSignal });
357357
continue;
358358
}
359359

@@ -363,7 +363,9 @@ export class PostgresSnapshotter {
363363
await this.markSnapshotDone();
364364
}
365365
}
366+
throw new ReplicationAbortedError();
366367
} catch (e) {
368+
// If initial snapshot already completed, this has no effect
367369
this.initialSnapshotDone.reject(e);
368370
throw e;
369371
}
@@ -404,7 +406,7 @@ export class PostgresSnapshotter {
404406
await this.storage.populatePersistentChecksumCache({
405407
// No checkpoint yet, but we do have the opId.
406408
maxOpId: lastOp,
407-
signal: this.abort_signal
409+
signal: this.abortSignal
408410
});
409411
}
410412
}
@@ -618,9 +620,9 @@ export class PostgresSnapshotter {
618620
this.logger.info(`Replicating ${table.qualifiedName} ${at}/${limited.length} for resnapshot`);
619621
}
620622

621-
if (this.abort_signal.aborted) {
623+
if (this.abortSignal.aborted) {
622624
// We only abort after flushing
623-
throw new ReplicationAbortedError(`Initial replication interrupted`);
625+
throw new ReplicationAbortedError(`Table snapshot interrupted`);
624626
}
625627
}
626628
}

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,17 +340,21 @@ export class WalStream {
340340
* Start replication loop, and continue until aborted or error.
341341
*/
342342
async replicate() {
343+
let streamPromise: Promise<void> | null = null;
344+
let loopPromise: Promise<void> | null = null;
343345
try {
344346
this.initPromise = this.initReplication();
345347
await this.initPromise;
346-
const streamPromise = this.streamChanges();
347-
const loopPromise = this.snapshotter.replicationLoop();
348+
streamPromise = this.streamChanges();
349+
loopPromise = this.snapshotter.replicationLoop();
348350
await Promise.race([loopPromise, streamPromise]);
349351
} catch (e) {
350352
await this.storage.reportError(e);
351353
throw e;
352354
} finally {
353355
this.abortController.abort();
356+
// Wait for both to finish, to ensure proper cleanup.
357+
await Promise.all([loopPromise?.catch((_) => {}), streamPromise?.catch((_) => {})]);
354358
}
355359
}
356360

0 commit comments

Comments
 (0)