diff --git a/packages/kernel-errors/src/constants.ts b/packages/kernel-errors/src/constants.ts index 2c98f6b43..fc5c55f8d 100644 --- a/packages/kernel-errors/src/constants.ts +++ b/packages/kernel-errors/src/constants.ts @@ -33,6 +33,7 @@ export const ErrorCode = { SubclusterNotFound: 'SUBCLUSTER_NOT_FOUND', SampleGenerationError: 'SAMPLE_GENERATION_ERROR', InternalError: 'INTERNAL_ERROR', + ResourceLimitError: 'RESOURCE_LIMIT_ERROR', } as const; export type ErrorCode = (typeof ErrorCode)[keyof typeof ErrorCode]; diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts new file mode 100644 index 000000000..ae2bf6da4 --- /dev/null +++ b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts @@ -0,0 +1,233 @@ +import { describe, it, expect } from 'vitest'; + +import { ResourceLimitError } from './ResourceLimitError.ts'; +import { ErrorCode, ErrorSentinel } from '../constants.ts'; +import { unmarshalErrorOptions } from '../marshal/unmarshalError.ts'; +import type { MarshaledOcapError } from '../types.ts'; + +describe('ResourceLimitError', () => { + it('creates a ResourceLimitError with the correct properties', () => { + const error = new ResourceLimitError('Connection limit exceeded'); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Connection limit exceeded'); + expect(error.data).toBeUndefined(); + }); + + it('creates a ResourceLimitError with connection limit data', () => { + const error = new ResourceLimitError('Connection limit exceeded', { + data: { + limitType: 'connection', + current: 100, + limit: 100, + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Connection limit exceeded'); + expect(error.data).toStrictEqual({ + limitType: 'connection', + current: 100, + limit: 100, + }); + }); + + it('creates a ResourceLimitError with message size limit data', () => { + const error = new ResourceLimitError('Message size limit exceeded', { + data: { + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Message size limit exceeded'); + expect(error.data).toStrictEqual({ + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }); + }); + + it('creates a ResourceLimitError with partial data', () => { + const error = new ResourceLimitError('Resource limit exceeded', { + data: { + limitType: 'connection', + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.data).toStrictEqual({ + limitType: 'connection', + }); + }); + + it('creates a ResourceLimitError with a cause', () => { + const cause = new Error('Original error'); + const error = new ResourceLimitError('Resource limit exceeded', { cause }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.cause).toBe(cause); + }); + + it('creates a ResourceLimitError with a custom stack', () => { + const customStack = 'custom stack trace'; + const error = new ResourceLimitError('Resource limit exceeded', { + stack: customStack, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.stack).toBe(customStack); + }); + + it('unmarshals a valid marshaled ResourceLimitError with connection limit data', () => { + const marshaledError: MarshaledOcapError = { + [ErrorSentinel]: true, + message: 'Connection limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 100, + limit: 100, + }, + stack: 'stack trace', + }; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Connection limit exceeded'); + expect(unmarshaledError.stack).toBe('stack trace'); + expect(unmarshaledError.data).toStrictEqual({ + limitType: 'connection', + current: 100, + limit: 100, + }); + }); + + it('unmarshals a valid marshaled ResourceLimitError with message size limit data', () => { + const marshaledError: MarshaledOcapError = { + [ErrorSentinel]: true, + message: 'Message size limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }, + stack: 'stack trace', + }; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Message size limit exceeded'); + expect(unmarshaledError.data).toStrictEqual({ + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }); + }); + + it('unmarshals a valid marshaled ResourceLimitError without data', () => { + const marshaledError = { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + stack: 'stack trace', + } as unknown as MarshaledOcapError; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Resource limit exceeded'); + expect(unmarshaledError.data).toBeUndefined(); + }); + + it.each([ + { + name: 'invalid limitType value', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'invalid', + current: 100, + limit: 100, + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal`, but received: "invalid"', + }, + { + name: 'invalid current type', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 'not a number', + limit: 100, + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.current -- Expected a number, but received: "not a number"', + }, + { + name: 'invalid limit type', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 100, + limit: 'not a number', + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.limit -- Expected a number, but received: "not a number"', + }, + { + name: 'wrong error code', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: 'WRONG_ERROR_CODE' as ErrorCode, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"RESOURCE_LIMIT_ERROR"`, but received: "WRONG_ERROR_CODE"', + }, + { + name: 'missing required fields', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + // Missing code field + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"RESOURCE_LIMIT_ERROR"`, but received: undefined', + }, + ])( + 'throws an error when unmarshaling with $name', + ({ marshaledError, expectedError }) => { + expect(() => + ResourceLimitError.unmarshal(marshaledError, unmarshalErrorOptions), + ).toThrow(expectedError); + }, + ); +}); diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts new file mode 100644 index 000000000..cec6fcef8 --- /dev/null +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -0,0 +1,76 @@ +import { + assert, + literal, + number, + object, + optional, + union, +} from '@metamask/superstruct'; + +import { BaseError } from '../BaseError.ts'; +import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; +import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; + +export class ResourceLimitError extends BaseError { + constructor( + message: string, + options?: ErrorOptionsWithStack & { + data?: { + limitType?: 'connection' | 'messageSize'; + current?: number; + limit?: number; + }; + }, + ) { + super(ErrorCode.ResourceLimitError, message, { + ...options, + }); + harden(this); + } + + /** + * A superstruct struct for validating marshaled {@link ResourceLimitError} instances. + */ + public static struct = object({ + ...marshaledErrorSchema, + code: literal(ErrorCode.ResourceLimitError), + data: optional( + object({ + limitType: optional( + union([literal('connection'), literal('messageSize')]), + ), + current: optional(number()), + limit: optional(number()), + }), + ), + }); + + /** + * Unmarshals a {@link MarshaledError} into a {@link ResourceLimitError}. + * + * @param marshaledError - The marshaled error to unmarshal. + * @param unmarshalErrorOptions - The function to unmarshal the error options. + * @returns The unmarshaled error. + */ + public static unmarshal( + marshaledError: MarshaledOcapError, + unmarshalErrorOptions: ( + marshaledError: MarshaledOcapError, + ) => ErrorOptionsWithStack, + ): ResourceLimitError { + assert(marshaledError, this.struct); + const options = unmarshalErrorOptions(marshaledError); + const data = marshaledError.data as + | { + limitType?: 'connection' | 'messageSize'; + current?: number; + limit?: number; + } + | undefined; + return new ResourceLimitError(marshaledError.message, { + ...options, + ...(data !== undefined && { data }), + }); + } +} +harden(ResourceLimitError); diff --git a/packages/kernel-errors/src/errors/index.ts b/packages/kernel-errors/src/errors/index.ts index adc182cb3..235abe9c4 100644 --- a/packages/kernel-errors/src/errors/index.ts +++ b/packages/kernel-errors/src/errors/index.ts @@ -1,6 +1,7 @@ import { AbortError } from './AbortError.ts'; import { DuplicateEndowmentError } from './DuplicateEndowmentError.ts'; import { EvaluatorError } from './EvaluatorError.ts'; +import { ResourceLimitError } from './ResourceLimitError.ts'; import { SampleGenerationError } from './SampleGenerationError.ts'; import { StreamReadError } from './StreamReadError.ts'; import { VatAlreadyExistsError } from './VatAlreadyExistsError.ts'; @@ -19,4 +20,5 @@ export const errorClasses = { [ErrorCode.SubclusterNotFound]: SubclusterNotFoundError, [ErrorCode.SampleGenerationError]: SampleGenerationError, [ErrorCode.InternalError]: EvaluatorError, + [ErrorCode.ResourceLimitError]: ResourceLimitError, } as const; diff --git a/packages/kernel-errors/src/index.test.ts b/packages/kernel-errors/src/index.test.ts index 0e55551ea..96a79688c 100644 --- a/packages/kernel-errors/src/index.test.ts +++ b/packages/kernel-errors/src/index.test.ts @@ -13,6 +13,7 @@ describe('index', () => { 'EvaluatorError', 'MarshaledErrorStruct', 'MarshaledOcapErrorStruct', + 'ResourceLimitError', 'SampleGenerationError', 'StreamReadError', 'SubclusterNotFoundError', diff --git a/packages/kernel-errors/src/index.ts b/packages/kernel-errors/src/index.ts index 43981ae55..9f3e40694 100644 --- a/packages/kernel-errors/src/index.ts +++ b/packages/kernel-errors/src/index.ts @@ -8,6 +8,7 @@ export { VatNotFoundError } from './errors/VatNotFoundError.ts'; export { StreamReadError } from './errors/StreamReadError.ts'; export { SubclusterNotFoundError } from './errors/SubclusterNotFoundError.ts'; export { AbortError } from './errors/AbortError.ts'; +export { ResourceLimitError } from './errors/ResourceLimitError.ts'; export { ErrorCode, ErrorSentinel, diff --git a/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts b/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts index ff7841661..fe9bb320a 100644 --- a/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts +++ b/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts @@ -1111,6 +1111,54 @@ describe('ConnectionFactory', () => { }); }); + describe('closeChannel', () => { + it('closes underlying stream when close is available', async () => { + factory = await createFactory(); + const close = vi.fn().mockResolvedValue(undefined); + const channel = { + peerId: 'peer-close', + msgStream: { + unwrap: () => ({ close }), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(close).toHaveBeenCalled(); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: closed channel stream`, + ); + }); + + it('aborts underlying stream when abort is available', async () => { + factory = await createFactory(); + const abort = vi.fn(); + const channel = { + peerId: 'peer-abort', + msgStream: { + unwrap: () => ({ abort }), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(abort).toHaveBeenCalledWith(expect.any(AbortError)); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: aborted channel stream`, + ); + }); + + it('logs when underlying stream lacks close and abort', async () => { + factory = await createFactory(); + const channel = { + peerId: 'peer-none', + msgStream: { + unwrap: () => ({}), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: channel stream lacks close/abort, relying on natural closure`, + ); + }); + }); + describe('integration scenarios', () => { it('handles complete connection lifecycle', async () => { createLibp2p.mockImplementation(async () => ({ diff --git a/packages/ocap-kernel/src/remotes/ConnectionFactory.ts b/packages/ocap-kernel/src/remotes/ConnectionFactory.ts index e833fd788..db3ffe2a0 100644 --- a/packages/ocap-kernel/src/remotes/ConnectionFactory.ts +++ b/packages/ocap-kernel/src/remotes/ConnectionFactory.ts @@ -323,6 +323,50 @@ export class ConnectionFactory { return promise; } + /** + * Close a channel's underlying stream to release network resources. + * + * @param channel - The channel to close. + * @param peerId - The peer ID for logging. + */ + async closeChannel(channel: Channel, peerId: string): Promise { + try { + // ByteStream.unwrap() returns the underlying libp2p stream. + const maybeWrapper = channel.msgStream as unknown as { + unwrap?: () => unknown; + }; + const underlying = + typeof maybeWrapper.unwrap === 'function' + ? maybeWrapper.unwrap() + : undefined; + + const closable = underlying as + | { close?: () => Promise } + | undefined; + if (closable?.close) { + await closable.close(); + this.#logger.log(`${peerId}:: closed channel stream`); + return; + } + + const abortable = underlying as + | { abort?: (error?: Error) => void } + | undefined; + if (abortable?.abort) { + abortable.abort(new AbortError()); + this.#logger.log(`${peerId}:: aborted channel stream`); + return; + } + + // If we cannot explicitly close/abort, log and rely on natural closure. + this.#logger.log( + `${peerId}:: channel stream lacks close/abort, relying on natural closure`, + ); + } catch (problem) { + this.#outputError(peerId, 'closing channel stream', problem); + } + } + /** * Output an error message. * diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index d051f9528..df55e88e4 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -1,5 +1,5 @@ -import { AbortError } from '@metamask/kernel-errors'; -import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; +import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; +import { delay, makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; import { describe, expect, @@ -60,6 +60,7 @@ const mockReconnectionManager = { resetBackoff: vi.fn(), resetAllBackoffs: vi.fn(), clear: vi.fn(), + clearPeer: vi.fn(), }; vi.mock('./ReconnectionManager.ts', () => { @@ -81,6 +82,8 @@ vi.mock('./ReconnectionManager.ts', () => { resetAllBackoffs = mockReconnectionManager.resetAllBackoffs; clear = mockReconnectionManager.clear; + + clearPeer = mockReconnectionManager.clearPeer; } return { ReconnectionManager: MockReconnectionManager, @@ -100,6 +103,7 @@ const mockConnectionFactory = { dialIdempotent: vi.fn(), onInboundConnection: vi.fn(), stop: vi.fn().mockResolvedValue(undefined), + closeChannel: vi.fn().mockResolvedValue(undefined), }; vi.mock('./ConnectionFactory.ts', () => { @@ -144,6 +148,12 @@ vi.mock('@metamask/kernel-errors', () => ({ this.name = 'AbortError'; } }, + ResourceLimitError: class MockResourceLimitError extends Error { + constructor(message: string) { + super(message); + this.name = 'ResourceLimitError'; + } + }, isRetryableNetworkError: vi.fn().mockImplementation((error: unknown) => { const errorWithCode = error as { code?: string }; return ( @@ -177,10 +187,12 @@ describe('network.initNetwork', () => { mockReconnectionManager.resetBackoff.mockClear(); mockReconnectionManager.resetAllBackoffs.mockClear(); mockReconnectionManager.clear.mockClear(); + mockReconnectionManager.clearPeer.mockClear(); mockConnectionFactory.dialIdempotent.mockClear(); mockConnectionFactory.onInboundConnection.mockClear(); mockConnectionFactory.stop.mockClear(); + mockConnectionFactory.closeChannel.mockClear(); mockLogger.log.mockClear(); mockLogger.error.mockClear(); @@ -202,8 +214,10 @@ describe('network.initNetwork', () => { }); afterEach(() => { - // Clear mocks after each test - vi.clearAllMocks(); + if (vi.isFakeTimers()) { + vi.clearAllTimers(); + vi.useRealTimers(); + } }); const createMockChannel = (peerId: string): MockChannel => ({ @@ -1934,4 +1948,452 @@ describe('network.initNetwork', () => { expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(2); }); }); + + describe('connection limit', () => { + it('enforces maximum concurrent connections', async () => { + const mockChannels: MockChannel[] = []; + // Create 100 mock channels + for (let i = 0; i < 100; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish 100 connections + for (let i = 0; i < 100; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt to establish 101st connection should fail + await expect(sendRemoteMessage('peer-101', 'msg')).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes(100); + }); + + it('respects custom maxConcurrentConnections option', async () => { + const customLimit = 5; + const mockChannels: MockChannel[] = []; + // Create mock channels up to custom limit + for (let i = 0; i < customLimit; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Establish connections up to custom limit + for (let i = 0; i < customLimit; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt to establish connection beyond custom limit should fail + await expect(sendRemoteMessage('peer-exceed', 'msg')).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes( + customLimit, + ); + }); + + it('rejects inbound connections when limit reached', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler) => { + inboundHandler = handler; + }, + ); + const mockChannels: MockChannel[] = []; + // Create 100 mock channels for outbound connections + for (let i = 0; i < 100; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish 100 outbound connections + for (let i = 0; i < 100; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt inbound connection should be rejected + const inboundChannel = createMockChannel('inbound-peer'); + inboundHandler?.(inboundChannel); + // Should not add to channels (connection rejected) + expect(mockLogger.log).toHaveBeenCalledWith( + 'inbound-peer:: rejecting inbound connection due to connection limit', + ); + }); + }); + + describe('message size limit', () => { + it('rejects messages exceeding 1MB size limit', async () => { + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message larger than 1MB + const largeMessage = 'x'.repeat(1024 * 1024 + 1); // 1MB + 1 byte + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).not.toHaveBeenCalled(); + expect(mockMessageQueue.enqueue).not.toHaveBeenCalled(); + }); + + it('allows messages at exactly 1MB size limit', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message exactly 1MB + const exactSizeMessage = 'x'.repeat(1024 * 1024); + await sendRemoteMessage('peer-1', exactSizeMessage); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalled(); + expect(mockChannel.msgStream.write).toHaveBeenCalled(); + }); + + it('validates message size before queueing during reconnection', async () => { + mockReconnectionManager.isReconnecting.mockReturnValue(true); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message larger than 1MB + const largeMessage = 'x'.repeat(1024 * 1024 + 1); + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + // Should not queue the message + expect(mockMessageQueue.enqueue).not.toHaveBeenCalled(); + }); + + it('respects custom maxMessageSizeBytes option', async () => { + const customLimit = 500 * 1024; // 500KB + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxMessageSizeBytes: customLimit }, + vi.fn(), + ); + // Create a message larger than custom limit + const largeMessage = 'x'.repeat(customLimit + 1); + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + // Create a message at exactly custom limit + const exactSizeMessage = 'x'.repeat(customLimit); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + await sendRemoteMessage('peer-1', exactSizeMessage); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalled(); + }); + }); + + describe('stale peer cleanup', () => { + it('sets up periodic cleanup interval', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + await initNetwork('0x1234', {}, vi.fn()); + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + 15 * 60 * 1000, + ); + expect(intervalFn).toBeDefined(); + setIntervalSpy.mockRestore(); + }); + + it('cleans up interval on stop', async () => { + const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((_fn: () => void, _ms?: number) => { + return 42 as unknown as NodeJS.Timeout; + }); + const { stop } = await initNetwork('0x1234', {}, vi.fn()); + await stop(); + expect(clearIntervalSpy).toHaveBeenCalledWith(42); + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + }); + + it('does not clean up peers with active connections', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish connection (sets lastConnectionTime) + await sendRemoteMessage('peer-1', 'msg'); + // Run cleanup immediately; should not remove active peer + intervalFn?.(); + await sendRemoteMessage('peer-1', 'msg2'); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes(1); + setIntervalSpy.mockRestore(); + }); + + it('does not clean up peers currently reconnecting', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + mockReconnectionManager.isReconnecting.mockReturnValue(true); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + await sendRemoteMessage('peer-1', 'msg'); + // Run cleanup immediately; reconnecting peer should not be cleaned + intervalFn?.(); + expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg'); + setIntervalSpy.mockRestore(); + }); + + it('cleans up stale peers and calls clearPeer', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + // End the inbound stream so the channel is removed from the active channels map. + // Stale cleanup only applies when there is no active channel. + mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const stalePeerTimeoutMs = 1; + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { stalePeerTimeoutMs }, + vi.fn(), + ); + // Establish connection (sets lastConnectionTime) + await sendRemoteMessage('peer-1', 'msg'); + // Wait until readChannel processes the stream end and removes the channel. + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + }); + // Ensure enough wall-clock time passes to exceed stalePeerTimeoutMs. + await delay(stalePeerTimeoutMs + 5); + // Run cleanup; stale peer should be cleaned + intervalFn?.(); + // Verify clearPeer was called + expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); + // Verify cleanup log message + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('peer-1:: cleaning up stale peer data'), + ); + setIntervalSpy.mockRestore(); + }); + + it('respects custom cleanupIntervalMs option', async () => { + const customInterval = 30 * 60 * 1000; // 30 minutes + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((_fn: () => void, _ms?: number) => { + return 1 as unknown as NodeJS.Timeout; + }); + await initNetwork( + '0x1234', + { cleanupIntervalMs: customInterval }, + vi.fn(), + ); + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + customInterval, + ); + setIntervalSpy.mockRestore(); + }); + + it('respects custom stalePeerTimeoutMs option', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const customTimeout = 50; + const mockChannel = createMockChannel('peer-1'); + // End the inbound stream so the channel is removed from the active channels map. + mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { + stalePeerTimeoutMs: customTimeout, + }, + vi.fn(), + ); + // Establish connection + await sendRemoteMessage('peer-1', 'msg'); + // Wait until readChannel processes the stream end and removes the channel. + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + }); + // Run cleanup quickly; peer should not be stale yet. + intervalFn?.(); + // Peer should not be cleaned (not stale yet) + expect(mockReconnectionManager.clearPeer).not.toHaveBeenCalled(); + // Wait beyond the custom timeout, then run cleanup again. + await delay(customTimeout + 10); + intervalFn?.(); + // Now peer should be cleaned + expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); + setIntervalSpy.mockRestore(); + }); + }); + + describe('reconnection respects connection limit', () => { + it('blocks reconnection when connection limit is reached', async () => { + const customLimit = 2; + const mockChannels: MockChannel[] = []; + // Create mock channels + for (let i = 0; i < customLimit; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + } + // Set up reconnection state + let reconnecting = false; + mockReconnectionManager.isReconnecting.mockImplementation( + () => reconnecting, + ); + mockReconnectionManager.startReconnection.mockImplementation(() => { + reconnecting = true; + }); + mockReconnectionManager.stopReconnection.mockImplementation(() => { + reconnecting = false; + }); + mockReconnectionManager.shouldRetry.mockReturnValue(true); + mockReconnectionManager.incrementAttempt.mockReturnValue(1); + mockReconnectionManager.calculateBackoff.mockReturnValue(100); // Small delay to ensure ordering + const { abortableDelay } = await import('@metamask/kernel-utils'); + (abortableDelay as ReturnType).mockImplementation( + async (ms: number) => { + // Use real delay to allow other operations to complete + await new Promise((resolve) => setTimeout(resolve, ms)); + }, + ); + // Set up dial mocks - initial connections + mockConnectionFactory.dialIdempotent + .mockResolvedValueOnce(mockChannels[0]) // peer-0 + .mockResolvedValueOnce(mockChannels[1]); // peer-1 + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Establish connections up to limit + await sendRemoteMessage('peer-0', 'msg'); + await sendRemoteMessage('peer-1', 'msg'); + // Disconnect peer-0 (simulate connection loss) + const peer0Channel = mockChannels[0] as MockChannel; + peer0Channel.msgStream.write.mockRejectedValueOnce( + Object.assign(new Error('Connection lost'), { code: 'ECONNRESET' }), + ); + // Trigger reconnection for peer-0 (this will remove peer-0 from channels) + await sendRemoteMessage('peer-0', 'msg2'); + // Wait for connection loss to be handled (channel removed) + await vi.waitFor( + () => { + expect( + mockReconnectionManager.startReconnection, + ).toHaveBeenCalledWith('peer-0'); + }, + { timeout: 1000 }, + ); + // Now fill the connection limit with a new peer (peer-0 is removed, so we have space) + // Ensure new-peer is NOT in reconnecting state + mockReconnectionManager.isReconnecting.mockImplementation((peerId) => { + return peerId === 'peer-0'; // Only peer-0 is reconnecting + }); + const newPeerChannel = createMockChannel('new-peer'); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce( + newPeerChannel, + ); + await sendRemoteMessage('new-peer', 'msg'); + // Wait a bit to ensure new-peer connection is fully established in channels map + await delay(50); + // Mock successful dial for reconnection attempt (but limit will block it) + const reconnectChannel = createMockChannel('peer-0'); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce( + reconnectChannel, + ); + // Verify reconnection started + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-0', + ); + // Wait for reconnection attempt to be blocked + await vi.waitFor( + () => { + // Should have logged that reconnection was blocked by limit + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining( + 'peer-0:: reconnection blocked by connection limit', + ), + ); + // Verify closeChannel was called to release network resources + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + reconnectChannel, + 'peer-0', + ); + }, + { timeout: 5000 }, + ); + // Verify reconnection continues (doesn't stop) - shouldRetry should be called + // meaning the loop continues after the limit check fails + expect(mockReconnectionManager.shouldRetry).toHaveBeenCalled(); + }, 10000); + }); + + describe('connection limit race condition', () => { + it('prevents exceeding limit when multiple concurrent dials occur', async () => { + const customLimit = 2; + const mockChannels: MockChannel[] = []; + + // Create mock channels + for (let i = 0; i < customLimit + 1; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + } + + // Set up dial mocks - all dials will succeed + mockConnectionFactory.dialIdempotent.mockImplementation( + async (peerId: string) => { + // Simulate async dial delay + await delay(10); + return mockChannels.find((ch) => ch.peerId === peerId) as MockChannel; + }, + ); + + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Start multiple concurrent dials that all pass the initial limit check + const sendPromises = Promise.all([ + sendRemoteMessage('peer-0', 'msg0'), + sendRemoteMessage('peer-1', 'msg1'), + sendRemoteMessage('peer-2', 'msg2'), // This should be blocked after dial + ]); + await sendPromises; + // Verify that only 2 channels were added (the limit) + // The third one should have been rejected after dial completed + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('peer-2:: connection limit reached after dial'), + ); + // Verify that peer-2's message was queued + expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg2'); + // Verify that reconnection was started for peer-2 (to retry later) + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-2', + ); + }, 10000); + }); }); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index fb6cebadf..20f20a005 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -1,4 +1,8 @@ -import { AbortError, isRetryableNetworkError } from '@metamask/kernel-errors'; +import { + AbortError, + isRetryableNetworkError, + ResourceLimitError, +} from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -22,6 +26,18 @@ import type { /** Default upper bound for queued outbound messages while reconnecting */ const DEFAULT_MAX_QUEUE = 200; +/** Default maximum number of concurrent connections */ +const DEFAULT_MAX_CONCURRENT_CONNECTIONS = 100; + +/** Default maximum message size in bytes (1MB) */ +const DEFAULT_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024; + +/** Default stale peer cleanup interval in milliseconds (15 minutes) */ +const DEFAULT_CLEANUP_INTERVAL_MS = 15 * 60 * 1000; + +/** Default stale peer timeout in milliseconds (1 hour) */ +const DEFAULT_STALE_PEER_TIMEOUT_MS = 60 * 60 * 1000; + /** * Initialize the remote comm system with information that must be provided by the kernel. * @@ -30,6 +46,10 @@ const DEFAULT_MAX_QUEUE = 200; * @param options.relays - PeerIds/Multiaddrs of known message relays. * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). * @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200). + * @param options.maxConcurrentConnections - Maximum number of concurrent connections (default: 100). + * @param options.maxMessageSizeBytes - Maximum message size in bytes (default: 1MB). + * @param options.cleanupIntervalMs - Stale peer cleanup interval in milliseconds (default: 15 minutes). + * @param options.stalePeerTimeoutMs - Stale peer timeout in milliseconds (default: 1 hour). * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @@ -51,6 +71,10 @@ export async function initNetwork( relays = [], maxRetryAttempts, maxQueue = DEFAULT_MAX_QUEUE, + maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS, + maxMessageSizeBytes = DEFAULT_MAX_MESSAGE_SIZE_BYTES, + cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_MS, + stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, } = options; let cleanupWakeDetector: (() => void) | undefined; const stopController = new AbortController(); @@ -60,6 +84,7 @@ export async function initNetwork( const reconnectionManager = new ReconnectionManager(); const messageQueues = new Map(); // One queue per peer const intentionallyClosed = new Set(); // Track peers that intentionally closed connections + const lastConnectionTime = new Map(); // Track last connection time for cleanup const connectionFactory = await ConnectionFactory.make( keySeed, relays, @@ -68,6 +93,8 @@ export async function initNetwork( maxRetryAttempts, ); const locationHints = new Map(); + let cleanupIntervalId: ReturnType | undefined; + /** * Output an error message. * @@ -191,6 +218,7 @@ export async function initNetwork( if (readBuf) { reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic await receiveMessage(channel.peerId, bufToString(readBuf.subarray())); + lastConnectionTime.set(channel.peerId, Date.now()); // update timestamp on inbound activity } else { // Stream ended (returned undefined), exit the read loop logger.log(`${channel.peerId}:: stream ended`); @@ -249,9 +277,7 @@ export async function initNetwork( logger.log( `${peerId}:: max reconnection attempts (${maxAttempts}) reached, giving up`, ); - reconnectionManager.stopReconnection(peerId); - queue.clear(); - onRemoteGiveUp?.(peerId); + giveUpOnPeer(peerId, queue); return; } @@ -283,15 +309,32 @@ export async function initNetwork( false, // No retry here, we're already in a retry loop ); - // Add channel to manager - channels.set(peerId, channel); + // Check connection limit before adding channel + // This prevents exceeding the limit if other connections were established + // during the reconnection delay + try { + checkConnectionLimit(); + } catch (limitError) { + // Connection limit reached - treat as retryable and continue loop + // The limit might free up when other connections close + logger.log( + `${peerId}:: reconnection blocked by connection limit, will retry`, + ); + outputError( + peerId, + `reconnection attempt ${nextAttempt}`, + limitError, + ); + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, peerId); + // Continue the reconnection loop + continue; + } - logger.log(`${peerId}:: reconnection successful`); + // Register channel and start reading + registerChannel(peerId, channel); - // Start reading from the new channel - readChannel(channel).catch((problem) => { - outputError(peerId, `reading channel to`, problem); - }); + logger.log(`${peerId}:: reconnection successful`); // Flush queued messages await flushQueuedMessages(peerId, channel, queue); @@ -315,9 +358,7 @@ export async function initNetwork( } if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); - reconnectionManager.stopReconnection(peerId); - queue.clear(); - onRemoteGiveUp?.(peerId); + giveUpOnPeer(peerId, queue); return; } outputError(peerId, `reconnection attempt ${nextAttempt}`, problem); @@ -368,6 +409,129 @@ export async function initNetwork( } } + /** + * Validate message size before sending or queuing. + * + * @param message - The message to validate. + * @throws ResourceLimitError if message exceeds size limit. + */ + function validateMessageSize(message: string): void { + const messageSizeBytes = new TextEncoder().encode(message).length; + if (messageSizeBytes > maxMessageSizeBytes) { + throw new ResourceLimitError( + `Message size ${messageSizeBytes} bytes exceeds limit of ${maxMessageSizeBytes} bytes`, + { + data: { + limitType: 'messageSize', + current: messageSizeBytes, + limit: maxMessageSizeBytes, + }, + }, + ); + } + } + + /** + * Check if we can establish a new connection (within connection limit). + * + * @throws ResourceLimitError if connection limit is reached. + */ + function checkConnectionLimit(): void { + const currentConnections = channels.size; + if (currentConnections >= maxConcurrentConnections) { + throw new ResourceLimitError( + `Connection limit reached: ${currentConnections}/${maxConcurrentConnections} concurrent connections`, + { + data: { + limitType: 'connection', + current: currentConnections, + limit: maxConcurrentConnections, + }, + }, + ); + } + } + + /** + * Register a channel and start reading from it. + * + * @param peerId - The peer ID for the channel. + * @param channel - The channel to register. + * @param errorContext - Optional context for error messages when reading fails. + */ + function registerChannel( + peerId: string, + channel: Channel, + errorContext = 'reading channel to', + ): void { + channels.set(peerId, channel); + lastConnectionTime.set(peerId, Date.now()); + readChannel(channel).catch((problem) => { + outputError(peerId, errorContext, problem); + }); + } + + /** + * Give up on a peer after max retries or non-retryable error. + * + * @param peerId - The peer ID to give up on. + * @param queue - The message queue for the peer. + */ + function giveUpOnPeer(peerId: string, queue: MessageQueue): void { + reconnectionManager.stopReconnection(peerId); + queue.clear(); + onRemoteGiveUp?.(peerId); + } + + /** + * Clean up stale peer data for peers disconnected for more than 1 hour. + */ + function cleanupStalePeers(): void { + const now = Date.now(); + const stalePeers: string[] = []; + + // Check all tracked peers + for (const [peerId, lastTime] of lastConnectionTime.entries()) { + const timeSinceLastConnection = now - lastTime; + const hasActiveChannel = channels.has(peerId); + const isReconnecting = reconnectionManager.isReconnecting(peerId); + + // Consider peer stale if: + // - No active channel + // - Not currently reconnecting + // - Disconnected for more than stalePeerTimeoutMs + if ( + !hasActiveChannel && + !isReconnecting && + timeSinceLastConnection > stalePeerTimeoutMs + ) { + stalePeers.push(peerId); + } + } + + // Clean up stale peer data + for (const peerId of stalePeers) { + const lastTime = lastConnectionTime.get(peerId); + if (lastTime !== undefined) { + const minutesSinceDisconnect = Math.round((now - lastTime) / 1000 / 60); + logger.log( + `${peerId}:: cleaning up stale peer data (disconnected for ${minutesSinceDisconnect} minutes)`, + ); + } + + // Remove from all tracking structures + lastConnectionTime.delete(peerId); + messageQueues.delete(peerId); + locationHints.delete(peerId); + + // Clear reconnection state if any + if (reconnectionManager.isReconnecting(peerId)) { + reconnectionManager.stopReconnection(peerId); + } + reconnectionManager.clearPeer(peerId); + } + } + /** * Send a message to a peer. * @@ -382,6 +546,9 @@ export async function initNetwork( return; } + // Validate message size before processing + validateMessageSize(message); + // Check if peer is intentionally closed if (intentionallyClosed.has(targetPeerId)) { throw new Error('Message delivery failed after intentional close'); @@ -400,6 +567,10 @@ export async function initNetwork( let channel = channels.get(targetPeerId); if (!channel) { + // Check connection limit before dialing new connection + // (Early check to fail fast, but we'll check again after dial to prevent race conditions) + checkConnectionLimit(); + try { const hints = locationHints.get(targetPeerId) ?? []; channel = await connectionFactory.dialIdempotent( @@ -418,23 +589,45 @@ export async function initNetwork( return; } - channels.set(targetPeerId, channel); + // Check if a concurrent call already registered a channel for this peer + // (dialIdempotent may return the same channel due to deduplication) + const existingChannel = channels.get(targetPeerId); + if (existingChannel) { + // Another concurrent call already registered the channel, use it + channel = existingChannel; + } else { + // Re-check connection limit after dial completes to prevent race conditions + // Multiple concurrent dials could all pass the initial check, then all add channels + try { + checkConnectionLimit(); + } catch { + // Connection limit reached - close the dialed channel and queue the message + logger.log( + `${targetPeerId}:: connection limit reached after dial, queueing message`, + ); + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, targetPeerId); + queue.enqueue(message); + // Start reconnection to retry later when limit might free up + handleConnectionLoss(targetPeerId); + return; + } + + registerChannel(targetPeerId, channel); + } } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); queue.enqueue(message); return; } - - readChannel(channel).catch((problem) => { - outputError(targetPeerId, `reading channel to`, problem); - }); } try { logger.log(`${targetPeerId}:: send ${message}`); await writeWithTimeout(channel, fromString(message), 10_000); reconnectionManager.resetBackoff(targetPeerId); + lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { outputError(targetPeerId, `sending message`, problem); handleConnectionLoss(targetPeerId); @@ -460,15 +653,47 @@ export async function initNetwork( // Don't add to channels map and don't start reading - connection will naturally close return; } - channels.set(channel.peerId, channel); - readChannel(channel).catch((error) => { - outputError(channel.peerId, 'error in inbound channel read', error); - }); + + // Check connection limit for inbound connections only if no existing channel + // If a channel already exists, this is likely a reconnection and the peer already has a slot + if (!channels.has(channel.peerId)) { + try { + checkConnectionLimit(); + } catch { + logger.log( + `${channel.peerId}:: rejecting inbound connection due to connection limit`, + ); + // Explicitly close the channel to release network resources + const closePromise = connectionFactory.closeChannel( + channel, + channel.peerId, + ); + if (typeof closePromise?.catch === 'function') { + closePromise.catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel', + problem, + ); + }); + } + return; + } + } + + registerChannel(channel.peerId, channel, 'error in inbound channel read'); }); // Install wake detector to reset backoff on sleep/wake cleanupWakeDetector = installWakeDetector(handleWakeFromSleep); + // Start periodic cleanup task for stale peers + cleanupIntervalId = setInterval(() => { + if (!signal.aborted) { + cleanupStalePeers(); + } + }, cleanupIntervalMs); + /** * Explicitly close a connection to a peer. * Marks the peer as intentionally closed to prevent automatic reconnection. @@ -541,12 +766,18 @@ export async function initNetwork( cleanupWakeDetector(); cleanupWakeDetector = undefined; } + // Stop cleanup interval + if (cleanupIntervalId) { + clearInterval(cleanupIntervalId); + cleanupIntervalId = undefined; + } stopController.abort(); // cancels all delays and dials await connectionFactory.stop(); channels.clear(); reconnectionManager.clear(); messageQueues.clear(); intentionallyClosed.clear(); + lastConnectionTime.clear(); } // Return the sender with a stop handle and connection management functions diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 2b895fbb2..66f51c626 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -44,6 +44,26 @@ export type RemoteCommsOptions = { * If not provided, uses the default MAX_QUEUE value. */ maxQueue?: number | undefined; + /** + * Maximum number of concurrent connections. + * If not provided, uses the default MAX_CONCURRENT_CONNECTIONS value (100). + */ + maxConcurrentConnections?: number | undefined; + /** + * Maximum message size in bytes. + * If not provided, uses the default MAX_MESSAGE_SIZE_BYTES value (1MB). + */ + maxMessageSizeBytes?: number | undefined; + /** + * Stale peer cleanup interval in milliseconds. + * If not provided, uses the default CLEANUP_INTERVAL_MS value (15 minutes). + */ + cleanupIntervalMs?: number | undefined; + /** + * Stale peer timeout in milliseconds (time before a disconnected peer is considered stale). + * If not provided, uses the default STALE_PEER_TIMEOUT_MS value (1 hour). + */ + stalePeerTimeoutMs?: number | undefined; }; export type RemoteInfo = { diff --git a/vitest.config.ts b/vitest.config.ts index 607ab8890..01caf03e9 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -82,7 +82,7 @@ export default defineConfig({ 'packages/kernel-agents/**': { statements: 94.15, functions: 94.64, - branches: 91.39, + branches: 91.42, lines: 94.15, }, 'packages/kernel-browser-runtime/**': { @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.53, - functions: 98.54, - branches: 97.59, - lines: 96.53, + statements: 96.42, + functions: 98.56, + branches: 97.27, + lines: 96.42, }, 'packages/omnium-gatherum/**': { statements: 5.67,