From 0e4059f2c346fb20044e07baa7fd3247178e6858 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 18 Dec 2025 15:37:50 +0100 Subject: [PATCH 1/8] feat(ocap-kernel): add resource limits for remote communications - Add connection limit (default 100 concurrent connections) - Add message size limit (default 1MB per message) - Add stale peer cleanup (removes data for peers disconnected >1 hour) - Make all limits configurable via RemoteCommsOptions - Add ResourceLimitError for limit violations - Add comprehensive tests for all resource limits This prevents memory exhaustion and manages system resources by: - Rejecting new connections when limit is reached - Rejecting messages exceeding size limit - Periodically cleaning up stale peer data --- packages/kernel-errors/src/constants.ts | 1 + .../src/errors/ResourceLimitError.ts | 66 ++++++ packages/kernel-errors/src/errors/index.ts | 2 + packages/kernel-errors/src/index.test.ts | 1 + packages/kernel-errors/src/index.ts | 1 + .../ocap-kernel/src/remotes/network.test.ts | 219 +++++++++++++++++- packages/ocap-kernel/src/remotes/network.ts | 156 ++++++++++++- packages/ocap-kernel/src/remotes/types.ts | 20 ++ vitest.config.ts | 16 +- 9 files changed, 470 insertions(+), 12 deletions(-) create mode 100644 packages/kernel-errors/src/errors/ResourceLimitError.ts diff --git a/packages/kernel-errors/src/constants.ts b/packages/kernel-errors/src/constants.ts index 2c98f6b43..fc5c55f8d 100644 --- a/packages/kernel-errors/src/constants.ts +++ b/packages/kernel-errors/src/constants.ts @@ -33,6 +33,7 @@ export const ErrorCode = { SubclusterNotFound: 'SUBCLUSTER_NOT_FOUND', SampleGenerationError: 'SAMPLE_GENERATION_ERROR', InternalError: 'INTERNAL_ERROR', + ResourceLimitError: 'RESOURCE_LIMIT_ERROR', } as const; export type ErrorCode = (typeof ErrorCode)[keyof typeof ErrorCode]; diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts new file mode 100644 index 000000000..1d11c8f14 --- /dev/null +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -0,0 +1,66 @@ +import { + assert, + literal, + number, + object, + optional, + union, +} from '@metamask/superstruct'; + +import { BaseError } from '../BaseError.ts'; +import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; +import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; + +export class ResourceLimitError extends BaseError { + constructor( + message: string, + options?: ErrorOptionsWithStack & { + data?: { + limitType?: 'connection' | 'messageSize'; + current?: number; + limit?: number; + }; + }, + ) { + super(ErrorCode.ResourceLimitError, message, { + ...options, + }); + harden(this); + } + + /** + * A superstruct struct for validating marshaled {@link ResourceLimitError} instances. + */ + public static struct = object({ + ...marshaledErrorSchema, + code: literal(ErrorCode.ResourceLimitError), + data: optional( + object({ + limitType: optional( + union([literal('connection'), literal('messageSize')]), + ), + current: optional(number()), + limit: optional(number()), + }), + ), + }); + + /** + * Unmarshals a {@link MarshaledError} into a {@link ResourceLimitError}. + * + * @param marshaledError - The marshaled error to unmarshal. + * @param unmarshalErrorOptions - The function to unmarshal the error options. + * @returns The unmarshaled error. + */ + public static unmarshal( + marshaledError: MarshaledOcapError, + unmarshalErrorOptions: ( + marshaledError: MarshaledOcapError, + ) => ErrorOptionsWithStack, + ): ResourceLimitError { + assert(marshaledError, this.struct); + const options = unmarshalErrorOptions(marshaledError); + return new ResourceLimitError(marshaledError.message, options); + } +} +harden(ResourceLimitError); diff --git a/packages/kernel-errors/src/errors/index.ts b/packages/kernel-errors/src/errors/index.ts index adc182cb3..235abe9c4 100644 --- a/packages/kernel-errors/src/errors/index.ts +++ b/packages/kernel-errors/src/errors/index.ts @@ -1,6 +1,7 @@ import { AbortError } from './AbortError.ts'; import { DuplicateEndowmentError } from './DuplicateEndowmentError.ts'; import { EvaluatorError } from './EvaluatorError.ts'; +import { ResourceLimitError } from './ResourceLimitError.ts'; import { SampleGenerationError } from './SampleGenerationError.ts'; import { StreamReadError } from './StreamReadError.ts'; import { VatAlreadyExistsError } from './VatAlreadyExistsError.ts'; @@ -19,4 +20,5 @@ export const errorClasses = { [ErrorCode.SubclusterNotFound]: SubclusterNotFoundError, [ErrorCode.SampleGenerationError]: SampleGenerationError, [ErrorCode.InternalError]: EvaluatorError, + [ErrorCode.ResourceLimitError]: ResourceLimitError, } as const; diff --git a/packages/kernel-errors/src/index.test.ts b/packages/kernel-errors/src/index.test.ts index 0e55551ea..96a79688c 100644 --- a/packages/kernel-errors/src/index.test.ts +++ b/packages/kernel-errors/src/index.test.ts @@ -13,6 +13,7 @@ describe('index', () => { 'EvaluatorError', 'MarshaledErrorStruct', 'MarshaledOcapErrorStruct', + 'ResourceLimitError', 'SampleGenerationError', 'StreamReadError', 'SubclusterNotFoundError', diff --git a/packages/kernel-errors/src/index.ts b/packages/kernel-errors/src/index.ts index 43981ae55..9f3e40694 100644 --- a/packages/kernel-errors/src/index.ts +++ b/packages/kernel-errors/src/index.ts @@ -8,6 +8,7 @@ export { VatNotFoundError } from './errors/VatNotFoundError.ts'; export { StreamReadError } from './errors/StreamReadError.ts'; export { SubclusterNotFoundError } from './errors/SubclusterNotFoundError.ts'; export { AbortError } from './errors/AbortError.ts'; +export { ResourceLimitError } from './errors/ResourceLimitError.ts'; export { ErrorCode, ErrorSentinel, diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index d051f9528..7388338a6 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -1,4 +1,4 @@ -import { AbortError } from '@metamask/kernel-errors'; +import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; import { describe, @@ -144,6 +144,12 @@ vi.mock('@metamask/kernel-errors', () => ({ this.name = 'AbortError'; } }, + ResourceLimitError: class MockResourceLimitError extends Error { + constructor(message: string) { + super(message); + this.name = 'ResourceLimitError'; + } + }, isRetryableNetworkError: vi.fn().mockImplementation((error: unknown) => { const errorWithCode = error as { code?: string }; return ( @@ -202,8 +208,10 @@ describe('network.initNetwork', () => { }); afterEach(() => { - // Clear mocks after each test - vi.clearAllMocks(); + if (vi.isFakeTimers()) { + vi.clearAllTimers(); + vi.useRealTimers(); + } }); const createMockChannel = (peerId: string): MockChannel => ({ @@ -1934,4 +1942,209 @@ describe('network.initNetwork', () => { expect(mockChannel.msgStream.write).toHaveBeenCalledTimes(2); }); }); + + describe('connection limit', () => { + it('enforces maximum concurrent connections', async () => { + const mockChannels: MockChannel[] = []; + // Create 100 mock channels + for (let i = 0; i < 100; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish 100 connections + for (let i = 0; i < 100; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt to establish 101st connection should fail + await expect(sendRemoteMessage('peer-101', 'msg')).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes(100); + }); + + it('respects custom maxConcurrentConnections option', async () => { + const customLimit = 5; + const mockChannels: MockChannel[] = []; + // Create mock channels up to custom limit + for (let i = 0; i < customLimit; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Establish connections up to custom limit + for (let i = 0; i < customLimit; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt to establish connection beyond custom limit should fail + await expect(sendRemoteMessage('peer-exceed', 'msg')).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes( + customLimit, + ); + }); + + it('rejects inbound connections when limit reached', async () => { + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler) => { + inboundHandler = handler; + }, + ); + const mockChannels: MockChannel[] = []; + // Create 100 mock channels for outbound connections + for (let i = 0; i < 100; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce(mockChannel); + } + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish 100 outbound connections + for (let i = 0; i < 100; i += 1) { + await sendRemoteMessage(`peer-${i}`, 'msg'); + } + // Attempt inbound connection should be rejected + const inboundChannel = createMockChannel('inbound-peer'); + inboundHandler?.(inboundChannel); + // Should not add to channels (connection rejected) + expect(mockLogger.log).toHaveBeenCalledWith( + 'inbound-peer:: rejecting inbound connection due to connection limit', + ); + }); + }); + + describe('message size limit', () => { + it('rejects messages exceeding 1MB size limit', async () => { + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message larger than 1MB + const largeMessage = 'x'.repeat(1024 * 1024 + 1); // 1MB + 1 byte + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + expect(mockConnectionFactory.dialIdempotent).not.toHaveBeenCalled(); + expect(mockMessageQueue.enqueue).not.toHaveBeenCalled(); + }); + + it('allows messages at exactly 1MB size limit', async () => { + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message exactly 1MB + const exactSizeMessage = 'x'.repeat(1024 * 1024); + await sendRemoteMessage('peer-1', exactSizeMessage); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalled(); + expect(mockChannel.msgStream.write).toHaveBeenCalled(); + }); + + it('validates message size before queueing during reconnection', async () => { + mockReconnectionManager.isReconnecting.mockReturnValue(true); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Create a message larger than 1MB + const largeMessage = 'x'.repeat(1024 * 1024 + 1); + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + // Should not queue the message + expect(mockMessageQueue.enqueue).not.toHaveBeenCalled(); + }); + + it('respects custom maxMessageSizeBytes option', async () => { + const customLimit = 500 * 1024; // 500KB + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxMessageSizeBytes: customLimit }, + vi.fn(), + ); + // Create a message larger than custom limit + const largeMessage = 'x'.repeat(customLimit + 1); + await expect(sendRemoteMessage('peer-1', largeMessage)).rejects.toThrow( + ResourceLimitError, + ); + // Create a message at exactly custom limit + const exactSizeMessage = 'x'.repeat(customLimit); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + await sendRemoteMessage('peer-1', exactSizeMessage); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalled(); + }); + }); + + describe('stale peer cleanup', () => { + it('sets up periodic cleanup interval', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + await initNetwork('0x1234', {}, vi.fn()); + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + 15 * 60 * 1000, + ); + expect(intervalFn).toBeDefined(); + setIntervalSpy.mockRestore(); + }); + + it('cleans up interval on stop', async () => { + const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((_fn: () => void, _ms?: number) => { + return 42 as unknown as NodeJS.Timeout; + }); + const { stop } = await initNetwork('0x1234', {}, vi.fn()); + await stop(); + expect(clearIntervalSpy).toHaveBeenCalledWith(42); + setIntervalSpy.mockRestore(); + clearIntervalSpy.mockRestore(); + }); + + it('does not clean up peers with active connections', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + // Establish connection (sets lastConnectionTime) + await sendRemoteMessage('peer-1', 'msg'); + // Run cleanup immediately; should not remove active peer + intervalFn?.(); + await sendRemoteMessage('peer-1', 'msg2'); + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes(1); + setIntervalSpy.mockRestore(); + }); + + it('does not clean up peers currently reconnecting', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + mockReconnectionManager.isReconnecting.mockReturnValue(true); + const { sendRemoteMessage } = await initNetwork('0x1234', {}, vi.fn()); + await sendRemoteMessage('peer-1', 'msg'); + // Run cleanup immediately; reconnecting peer should not be cleaned + intervalFn?.(); + expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg'); + setIntervalSpy.mockRestore(); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index fb6cebadf..45b8d4e7a 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -1,4 +1,8 @@ -import { AbortError, isRetryableNetworkError } from '@metamask/kernel-errors'; +import { + AbortError, + isRetryableNetworkError, + ResourceLimitError, +} from '@metamask/kernel-errors'; import { abortableDelay, DEFAULT_MAX_RETRY_ATTEMPTS, @@ -22,6 +26,18 @@ import type { /** Default upper bound for queued outbound messages while reconnecting */ const DEFAULT_MAX_QUEUE = 200; +/** Default maximum number of concurrent connections */ +const DEFAULT_MAX_CONCURRENT_CONNECTIONS = 100; + +/** Default maximum message size in bytes (1MB) */ +const DEFAULT_MAX_MESSAGE_SIZE_BYTES = 1024 * 1024; + +/** Default stale peer cleanup interval in milliseconds (15 minutes) */ +const DEFAULT_CLEANUP_INTERVAL_MS = 15 * 60 * 1000; + +/** Default stale peer timeout in milliseconds (1 hour) */ +const DEFAULT_STALE_PEER_TIMEOUT_MS = 60 * 60 * 1000; + /** * Initialize the remote comm system with information that must be provided by the kernel. * @@ -30,6 +46,10 @@ const DEFAULT_MAX_QUEUE = 200; * @param options.relays - PeerIds/Multiaddrs of known message relays. * @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default). * @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200). + * @param options.maxConcurrentConnections - Maximum number of concurrent connections (default: 100). + * @param options.maxMessageSizeBytes - Maximum message size in bytes (default: 1MB). + * @param options.cleanupIntervalMs - Stale peer cleanup interval in milliseconds (default: 15 minutes). + * @param options.stalePeerTimeoutMs - Stale peer timeout in milliseconds (default: 1 hour). * @param remoteMessageHandler - Handler to be called when messages are received from elsewhere. * @param onRemoteGiveUp - Optional callback to be called when we give up on a remote (after max retries or non-retryable error). * @@ -51,6 +71,10 @@ export async function initNetwork( relays = [], maxRetryAttempts, maxQueue = DEFAULT_MAX_QUEUE, + maxConcurrentConnections = DEFAULT_MAX_CONCURRENT_CONNECTIONS, + maxMessageSizeBytes = DEFAULT_MAX_MESSAGE_SIZE_BYTES, + cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_MS, + stalePeerTimeoutMs = DEFAULT_STALE_PEER_TIMEOUT_MS, } = options; let cleanupWakeDetector: (() => void) | undefined; const stopController = new AbortController(); @@ -60,6 +84,7 @@ export async function initNetwork( const reconnectionManager = new ReconnectionManager(); const messageQueues = new Map(); // One queue per peer const intentionallyClosed = new Set(); // Track peers that intentionally closed connections + const lastConnectionTime = new Map(); // Track last connection time for cleanup const connectionFactory = await ConnectionFactory.make( keySeed, relays, @@ -68,6 +93,8 @@ export async function initNetwork( maxRetryAttempts, ); const locationHints = new Map(); + let cleanupIntervalId: ReturnType | undefined; + /** * Output an error message. * @@ -285,6 +312,7 @@ export async function initNetwork( // Add channel to manager channels.set(peerId, channel); + lastConnectionTime.set(peerId, Date.now()); logger.log(`${peerId}:: reconnection successful`); @@ -368,6 +396,98 @@ export async function initNetwork( } } + /** + * Validate message size before sending or queuing. + * + * @param message - The message to validate. + * @throws ResourceLimitError if message exceeds size limit. + */ + function validateMessageSize(message: string): void { + const messageSizeBytes = new TextEncoder().encode(message).length; + if (messageSizeBytes > maxMessageSizeBytes) { + throw new ResourceLimitError( + `Message size ${messageSizeBytes} bytes exceeds limit of ${maxMessageSizeBytes} bytes`, + { + data: { + limitType: 'messageSize', + current: messageSizeBytes, + limit: maxMessageSizeBytes, + }, + }, + ); + } + } + + /** + * Check if we can establish a new connection (within connection limit). + * + * @throws ResourceLimitError if connection limit is reached. + */ + function checkConnectionLimit(): void { + const currentConnections = channels.size; + if (currentConnections >= maxConcurrentConnections) { + throw new ResourceLimitError( + `Connection limit reached: ${currentConnections}/${maxConcurrentConnections} concurrent connections`, + { + data: { + limitType: 'connection', + current: currentConnections, + limit: maxConcurrentConnections, + }, + }, + ); + } + } + + /** + * Clean up stale peer data for peers disconnected for more than 1 hour. + */ + function cleanupStalePeers(): void { + const now = Date.now(); + const stalePeers: string[] = []; + + // Check all tracked peers + for (const [peerId, lastTime] of lastConnectionTime.entries()) { + const timeSinceLastConnection = now - lastTime; + const hasActiveChannel = channels.has(peerId); + const isReconnecting = reconnectionManager.isReconnecting(peerId); + + // Consider peer stale if: + // - No active channel + // - Not currently reconnecting + // - Disconnected for more than stalePeerTimeoutMs + if ( + !hasActiveChannel && + !isReconnecting && + timeSinceLastConnection > stalePeerTimeoutMs + ) { + stalePeers.push(peerId); + } + } + + // Clean up stale peer data + for (const peerId of stalePeers) { + const lastTime = lastConnectionTime.get(peerId); + if (lastTime !== undefined) { + const minutesSinceDisconnect = Math.round((now - lastTime) / 1000 / 60); + logger.log( + `${peerId}:: cleaning up stale peer data (disconnected for ${minutesSinceDisconnect} minutes)`, + ); + } + + // Remove from all tracking structures + lastConnectionTime.delete(peerId); + messageQueues.delete(peerId); + locationHints.delete(peerId); + + // Clear reconnection state if any + if (reconnectionManager.isReconnecting(peerId)) { + reconnectionManager.stopReconnection(peerId); + } + reconnectionManager.clearPeer(peerId); + } + } + /** * Send a message to a peer. * @@ -382,6 +502,9 @@ export async function initNetwork( return; } + // Validate message size before processing + validateMessageSize(message); + // Check if peer is intentionally closed if (intentionallyClosed.has(targetPeerId)) { throw new Error('Message delivery failed after intentional close'); @@ -400,6 +523,9 @@ export async function initNetwork( let channel = channels.get(targetPeerId); if (!channel) { + // Check connection limit before dialing new connection + checkConnectionLimit(); + try { const hints = locationHints.get(targetPeerId) ?? []; channel = await connectionFactory.dialIdempotent( @@ -419,6 +545,7 @@ export async function initNetwork( } channels.set(targetPeerId, channel); + lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); @@ -435,6 +562,7 @@ export async function initNetwork( logger.log(`${targetPeerId}:: send ${message}`); await writeWithTimeout(channel, fromString(message), 10_000); reconnectionManager.resetBackoff(targetPeerId); + lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { outputError(targetPeerId, `sending message`, problem); handleConnectionLoss(targetPeerId); @@ -460,7 +588,20 @@ export async function initNetwork( // Don't add to channels map and don't start reading - connection will naturally close return; } + + // Check connection limit for inbound connections + try { + checkConnectionLimit(); + } catch { + logger.log( + `${channel.peerId}:: rejecting inbound connection due to connection limit`, + ); + // Don't add to channels map - connection will naturally close + return; + } + channels.set(channel.peerId, channel); + lastConnectionTime.set(channel.peerId, Date.now()); readChannel(channel).catch((error) => { outputError(channel.peerId, 'error in inbound channel read', error); }); @@ -469,6 +610,13 @@ export async function initNetwork( // Install wake detector to reset backoff on sleep/wake cleanupWakeDetector = installWakeDetector(handleWakeFromSleep); + // Start periodic cleanup task for stale peers + cleanupIntervalId = setInterval(() => { + if (!signal.aborted) { + cleanupStalePeers(); + } + }, cleanupIntervalMs); + /** * Explicitly close a connection to a peer. * Marks the peer as intentionally closed to prevent automatic reconnection. @@ -541,12 +689,18 @@ export async function initNetwork( cleanupWakeDetector(); cleanupWakeDetector = undefined; } + // Stop cleanup interval + if (cleanupIntervalId) { + clearInterval(cleanupIntervalId); + cleanupIntervalId = undefined; + } stopController.abort(); // cancels all delays and dials await connectionFactory.stop(); channels.clear(); reconnectionManager.clear(); messageQueues.clear(); intentionallyClosed.clear(); + lastConnectionTime.clear(); } // Return the sender with a stop handle and connection management functions diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 2b895fbb2..66f51c626 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -44,6 +44,26 @@ export type RemoteCommsOptions = { * If not provided, uses the default MAX_QUEUE value. */ maxQueue?: number | undefined; + /** + * Maximum number of concurrent connections. + * If not provided, uses the default MAX_CONCURRENT_CONNECTIONS value (100). + */ + maxConcurrentConnections?: number | undefined; + /** + * Maximum message size in bytes. + * If not provided, uses the default MAX_MESSAGE_SIZE_BYTES value (1MB). + */ + maxMessageSizeBytes?: number | undefined; + /** + * Stale peer cleanup interval in milliseconds. + * If not provided, uses the default CLEANUP_INTERVAL_MS value (15 minutes). + */ + cleanupIntervalMs?: number | undefined; + /** + * Stale peer timeout in milliseconds (time before a disconnected peer is considered stale). + * If not provided, uses the default STALE_PEER_TIMEOUT_MS value (1 hour). + */ + stalePeerTimeoutMs?: number | undefined; }; export type RemoteInfo = { diff --git a/vitest.config.ts b/vitest.config.ts index 607ab8890..2d2681b33 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -92,10 +92,10 @@ export default defineConfig({ lines: 87.52, }, 'packages/kernel-errors/**': { - statements: 100, - functions: 100, + statements: 97.57, + functions: 96.07, branches: 100, - lines: 100, + lines: 97.57, }, 'packages/kernel-language-model-service/**': { statements: 98.35, @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.53, - functions: 98.54, - branches: 97.59, - lines: 96.53, + statements: 96.25, + functions: 98.55, + branches: 97.32, + lines: 96.25, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 1627ba63006677b7e66d44d26cbf206354276f4f Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 18 Dec 2025 16:01:07 +0100 Subject: [PATCH 2/8] fix bugs --- .../src/errors/ResourceLimitError.test.ts | 233 ++++++++++++++++++ .../src/errors/ResourceLimitError.ts | 12 +- .../ocap-kernel/src/remotes/network.test.ts | 145 ++++++++++- packages/ocap-kernel/src/remotes/network.ts | 38 +++ vitest.config.ts | 14 +- 5 files changed, 433 insertions(+), 9 deletions(-) create mode 100644 packages/kernel-errors/src/errors/ResourceLimitError.test.ts diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.test.ts b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts new file mode 100644 index 000000000..ae2bf6da4 --- /dev/null +++ b/packages/kernel-errors/src/errors/ResourceLimitError.test.ts @@ -0,0 +1,233 @@ +import { describe, it, expect } from 'vitest'; + +import { ResourceLimitError } from './ResourceLimitError.ts'; +import { ErrorCode, ErrorSentinel } from '../constants.ts'; +import { unmarshalErrorOptions } from '../marshal/unmarshalError.ts'; +import type { MarshaledOcapError } from '../types.ts'; + +describe('ResourceLimitError', () => { + it('creates a ResourceLimitError with the correct properties', () => { + const error = new ResourceLimitError('Connection limit exceeded'); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Connection limit exceeded'); + expect(error.data).toBeUndefined(); + }); + + it('creates a ResourceLimitError with connection limit data', () => { + const error = new ResourceLimitError('Connection limit exceeded', { + data: { + limitType: 'connection', + current: 100, + limit: 100, + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Connection limit exceeded'); + expect(error.data).toStrictEqual({ + limitType: 'connection', + current: 100, + limit: 100, + }); + }); + + it('creates a ResourceLimitError with message size limit data', () => { + const error = new ResourceLimitError('Message size limit exceeded', { + data: { + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.message).toBe('Message size limit exceeded'); + expect(error.data).toStrictEqual({ + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }); + }); + + it('creates a ResourceLimitError with partial data', () => { + const error = new ResourceLimitError('Resource limit exceeded', { + data: { + limitType: 'connection', + }, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.data).toStrictEqual({ + limitType: 'connection', + }); + }); + + it('creates a ResourceLimitError with a cause', () => { + const cause = new Error('Original error'); + const error = new ResourceLimitError('Resource limit exceeded', { cause }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.code).toBe(ErrorCode.ResourceLimitError); + expect(error.cause).toBe(cause); + }); + + it('creates a ResourceLimitError with a custom stack', () => { + const customStack = 'custom stack trace'; + const error = new ResourceLimitError('Resource limit exceeded', { + stack: customStack, + }); + expect(error).toBeInstanceOf(ResourceLimitError); + expect(error.stack).toBe(customStack); + }); + + it('unmarshals a valid marshaled ResourceLimitError with connection limit data', () => { + const marshaledError: MarshaledOcapError = { + [ErrorSentinel]: true, + message: 'Connection limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 100, + limit: 100, + }, + stack: 'stack trace', + }; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Connection limit exceeded'); + expect(unmarshaledError.stack).toBe('stack trace'); + expect(unmarshaledError.data).toStrictEqual({ + limitType: 'connection', + current: 100, + limit: 100, + }); + }); + + it('unmarshals a valid marshaled ResourceLimitError with message size limit data', () => { + const marshaledError: MarshaledOcapError = { + [ErrorSentinel]: true, + message: 'Message size limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }, + stack: 'stack trace', + }; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Message size limit exceeded'); + expect(unmarshaledError.data).toStrictEqual({ + limitType: 'messageSize', + current: 1048577, + limit: 1048576, + }); + }); + + it('unmarshals a valid marshaled ResourceLimitError without data', () => { + const marshaledError = { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + stack: 'stack trace', + } as unknown as MarshaledOcapError; + + const unmarshaledError = ResourceLimitError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaledError).toBeInstanceOf(ResourceLimitError); + expect(unmarshaledError.code).toBe(ErrorCode.ResourceLimitError); + expect(unmarshaledError.message).toBe('Resource limit exceeded'); + expect(unmarshaledError.data).toBeUndefined(); + }); + + it.each([ + { + name: 'invalid limitType value', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'invalid', + current: 100, + limit: 100, + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.limitType -- Expected the value to satisfy a union of `literal | literal`, but received: "invalid"', + }, + { + name: 'invalid current type', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 'not a number', + limit: 100, + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.current -- Expected a number, but received: "not a number"', + }, + { + name: 'invalid limit type', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: ErrorCode.ResourceLimitError, + data: { + limitType: 'connection', + current: 100, + limit: 'not a number', + }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data.limit -- Expected a number, but received: "not a number"', + }, + { + name: 'wrong error code', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + code: 'WRONG_ERROR_CODE' as ErrorCode, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"RESOURCE_LIMIT_ERROR"`, but received: "WRONG_ERROR_CODE"', + }, + { + name: 'missing required fields', + marshaledError: { + [ErrorSentinel]: true, + message: 'Resource limit exceeded', + // Missing code field + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"RESOURCE_LIMIT_ERROR"`, but received: undefined', + }, + ])( + 'throws an error when unmarshaling with $name', + ({ marshaledError, expectedError }) => { + expect(() => + ResourceLimitError.unmarshal(marshaledError, unmarshalErrorOptions), + ).toThrow(expectedError); + }, + ); +}); diff --git a/packages/kernel-errors/src/errors/ResourceLimitError.ts b/packages/kernel-errors/src/errors/ResourceLimitError.ts index 1d11c8f14..cec6fcef8 100644 --- a/packages/kernel-errors/src/errors/ResourceLimitError.ts +++ b/packages/kernel-errors/src/errors/ResourceLimitError.ts @@ -60,7 +60,17 @@ export class ResourceLimitError extends BaseError { ): ResourceLimitError { assert(marshaledError, this.struct); const options = unmarshalErrorOptions(marshaledError); - return new ResourceLimitError(marshaledError.message, options); + const data = marshaledError.data as + | { + limitType?: 'connection' | 'messageSize'; + current?: number; + limit?: number; + } + | undefined; + return new ResourceLimitError(marshaledError.message, { + ...options, + ...(data !== undefined && { data }), + }); } } harden(ResourceLimitError); diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index 7388338a6..03d0c3f76 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -1,5 +1,5 @@ import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; -import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; +import { delay, makeAbortSignalMock } from '@ocap/repo-tools/test-utils'; import { describe, expect, @@ -2147,4 +2147,147 @@ describe('network.initNetwork', () => { setIntervalSpy.mockRestore(); }); }); + + describe('reconnection respects connection limit', () => { + it('blocks reconnection when connection limit is reached', async () => { + const customLimit = 2; + const mockChannels: MockChannel[] = []; + // Create mock channels + for (let i = 0; i < customLimit; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + } + // Set up reconnection state + let reconnecting = false; + mockReconnectionManager.isReconnecting.mockImplementation( + () => reconnecting, + ); + mockReconnectionManager.startReconnection.mockImplementation(() => { + reconnecting = true; + }); + mockReconnectionManager.stopReconnection.mockImplementation(() => { + reconnecting = false; + }); + mockReconnectionManager.shouldRetry.mockReturnValue(true); + mockReconnectionManager.incrementAttempt.mockReturnValue(1); + mockReconnectionManager.calculateBackoff.mockReturnValue(100); // Small delay to ensure ordering + const { abortableDelay } = await import('@metamask/kernel-utils'); + (abortableDelay as ReturnType).mockImplementation( + async (ms: number) => { + // Use real delay to allow other operations to complete + await new Promise((resolve) => setTimeout(resolve, ms)); + }, + ); + // Set up dial mocks - initial connections + mockConnectionFactory.dialIdempotent + .mockResolvedValueOnce(mockChannels[0]) // peer-0 + .mockResolvedValueOnce(mockChannels[1]); // peer-1 + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Establish connections up to limit + await sendRemoteMessage('peer-0', 'msg'); + await sendRemoteMessage('peer-1', 'msg'); + // Disconnect peer-0 (simulate connection loss) + const peer0Channel = mockChannels[0] as MockChannel; + peer0Channel.msgStream.write.mockRejectedValueOnce( + Object.assign(new Error('Connection lost'), { code: 'ECONNRESET' }), + ); + // Trigger reconnection for peer-0 (this will remove peer-0 from channels) + await sendRemoteMessage('peer-0', 'msg2'); + // Wait for connection loss to be handled (channel removed) + await vi.waitFor( + () => { + expect( + mockReconnectionManager.startReconnection, + ).toHaveBeenCalledWith('peer-0'); + }, + { timeout: 1000 }, + ); + // Now fill the connection limit with a new peer (peer-0 is removed, so we have space) + // Ensure new-peer is NOT in reconnecting state + mockReconnectionManager.isReconnecting.mockImplementation((peerId) => { + return peerId === 'peer-0'; // Only peer-0 is reconnecting + }); + const newPeerChannel = createMockChannel('new-peer'); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce( + newPeerChannel, + ); + await sendRemoteMessage('new-peer', 'msg'); + // Wait a bit to ensure new-peer connection is fully established in channels map + await delay(50); + // Mock successful dial for reconnection attempt (but limit will block it) + const reconnectChannel = createMockChannel('peer-0'); + mockConnectionFactory.dialIdempotent.mockResolvedValueOnce( + reconnectChannel, + ); + // Verify reconnection started + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-0', + ); + // Wait for reconnection attempt to be blocked + await vi.waitFor( + () => { + // Should have logged that reconnection was blocked by limit + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining( + 'peer-0:: reconnection blocked by connection limit', + ), + ); + }, + { timeout: 5000 }, + ); + // Verify reconnection continues (doesn't stop) - shouldRetry should be called + // meaning the loop continues after the limit check fails + expect(mockReconnectionManager.shouldRetry).toHaveBeenCalled(); + }, 10000); + }); + + describe('connection limit race condition', () => { + it('prevents exceeding limit when multiple concurrent dials occur', async () => { + const customLimit = 2; + const mockChannels: MockChannel[] = []; + + // Create mock channels + for (let i = 0; i < customLimit + 1; i += 1) { + const mockChannel = createMockChannel(`peer-${i}`); + mockChannels.push(mockChannel); + } + + // Set up dial mocks - all dials will succeed + mockConnectionFactory.dialIdempotent.mockImplementation( + async (peerId: string) => { + // Simulate async dial delay + await delay(10); + return mockChannels.find((ch) => ch.peerId === peerId) as MockChannel; + }, + ); + + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { maxConcurrentConnections: customLimit }, + vi.fn(), + ); + // Start multiple concurrent dials that all pass the initial limit check + const sendPromises = Promise.all([ + sendRemoteMessage('peer-0', 'msg0'), + sendRemoteMessage('peer-1', 'msg1'), + sendRemoteMessage('peer-2', 'msg2'), // This should be blocked after dial + ]); + await sendPromises; + // Verify that only 2 channels were added (the limit) + // The third one should have been rejected after dial completed + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('peer-2:: connection limit reached after dial'), + ); + // Verify that peer-2's message was queued + expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg2'); + // Verify that reconnection was started for peer-2 (to retry later) + expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith( + 'peer-2', + ); + }, 10000); + }); }); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 45b8d4e7a..ee77ec3d0 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -310,6 +310,27 @@ export async function initNetwork( false, // No retry here, we're already in a retry loop ); + // Check connection limit before adding channel + // This prevents exceeding the limit if other connections were established + // during the reconnection delay + try { + checkConnectionLimit(); + } catch (limitError) { + // Connection limit reached - treat as retryable and continue loop + // The limit might free up when other connections close + logger.log( + `${peerId}:: reconnection blocked by connection limit, will retry`, + ); + outputError( + peerId, + `reconnection attempt ${nextAttempt}`, + limitError, + ); + // Don't add channel - connection will naturally close + // Continue the reconnection loop + continue; + } + // Add channel to manager channels.set(peerId, channel); lastConnectionTime.set(peerId, Date.now()); @@ -524,6 +545,7 @@ export async function initNetwork( let channel = channels.get(targetPeerId); if (!channel) { // Check connection limit before dialing new connection + // (Early check to fail fast, but we'll check again after dial to prevent race conditions) checkConnectionLimit(); try { @@ -544,6 +566,22 @@ export async function initNetwork( return; } + // Re-check connection limit after dial completes to prevent race conditions + // Multiple concurrent dials could all pass the initial check, then all add channels + try { + checkConnectionLimit(); + } catch { + // Connection limit reached - close the dialed channel and queue the message + logger.log( + `${targetPeerId}:: connection limit reached after dial, queueing message`, + ); + queue.enqueue(message); + // Don't add channel - connection will naturally close + // Start reconnection to retry later when limit might free up + handleConnectionLoss(targetPeerId); + return; + } + channels.set(targetPeerId, channel); lastConnectionTime.set(targetPeerId, Date.now()); } catch (problem) { diff --git a/vitest.config.ts b/vitest.config.ts index 2d2681b33..12d5d0c25 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -92,10 +92,10 @@ export default defineConfig({ lines: 87.52, }, 'packages/kernel-errors/**': { - statements: 97.57, - functions: 96.07, + statements: 100, + functions: 100, branches: 100, - lines: 97.57, + lines: 100, }, 'packages/kernel-language-model-service/**': { statements: 98.35, @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.25, + statements: 96.27, functions: 98.55, - branches: 97.32, - lines: 96.25, + branches: 97.33, + lines: 96.27, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 33be846c23d191d018435a64ada4681909b24904 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Thu, 18 Dec 2025 16:05:33 +0100 Subject: [PATCH 3/8] merge --- vitest.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vitest.config.ts b/vitest.config.ts index 12d5d0c25..31579df29 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { From e12be1b1b246ea9535f17843e5e124c9312e5b69 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:06:42 +0100 Subject: [PATCH 4/8] close channel to release network resources --- .../src/remotes/ConnectionFactory.test.ts | 48 ++++++++ .../src/remotes/ConnectionFactory.ts | 44 ++++++++ .../ocap-kernel/src/remotes/network.test.ts | 106 ++++++++++++++++++ packages/ocap-kernel/src/remotes/network.ts | 21 +++- vitest.config.ts | 8 +- 5 files changed, 220 insertions(+), 7 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts b/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts index ff7841661..fe9bb320a 100644 --- a/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts +++ b/packages/ocap-kernel/src/remotes/ConnectionFactory.test.ts @@ -1111,6 +1111,54 @@ describe('ConnectionFactory', () => { }); }); + describe('closeChannel', () => { + it('closes underlying stream when close is available', async () => { + factory = await createFactory(); + const close = vi.fn().mockResolvedValue(undefined); + const channel = { + peerId: 'peer-close', + msgStream: { + unwrap: () => ({ close }), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(close).toHaveBeenCalled(); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: closed channel stream`, + ); + }); + + it('aborts underlying stream when abort is available', async () => { + factory = await createFactory(); + const abort = vi.fn(); + const channel = { + peerId: 'peer-abort', + msgStream: { + unwrap: () => ({ abort }), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(abort).toHaveBeenCalledWith(expect.any(AbortError)); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: aborted channel stream`, + ); + }); + + it('logs when underlying stream lacks close and abort', async () => { + factory = await createFactory(); + const channel = { + peerId: 'peer-none', + msgStream: { + unwrap: () => ({}), + }, + } as unknown as Channel; + await factory.closeChannel(channel, channel.peerId); + expect(mockLoggerLog).toHaveBeenCalledWith( + `${channel.peerId}:: channel stream lacks close/abort, relying on natural closure`, + ); + }); + }); + describe('integration scenarios', () => { it('handles complete connection lifecycle', async () => { createLibp2p.mockImplementation(async () => ({ diff --git a/packages/ocap-kernel/src/remotes/ConnectionFactory.ts b/packages/ocap-kernel/src/remotes/ConnectionFactory.ts index e833fd788..db3ffe2a0 100644 --- a/packages/ocap-kernel/src/remotes/ConnectionFactory.ts +++ b/packages/ocap-kernel/src/remotes/ConnectionFactory.ts @@ -323,6 +323,50 @@ export class ConnectionFactory { return promise; } + /** + * Close a channel's underlying stream to release network resources. + * + * @param channel - The channel to close. + * @param peerId - The peer ID for logging. + */ + async closeChannel(channel: Channel, peerId: string): Promise { + try { + // ByteStream.unwrap() returns the underlying libp2p stream. + const maybeWrapper = channel.msgStream as unknown as { + unwrap?: () => unknown; + }; + const underlying = + typeof maybeWrapper.unwrap === 'function' + ? maybeWrapper.unwrap() + : undefined; + + const closable = underlying as + | { close?: () => Promise } + | undefined; + if (closable?.close) { + await closable.close(); + this.#logger.log(`${peerId}:: closed channel stream`); + return; + } + + const abortable = underlying as + | { abort?: (error?: Error) => void } + | undefined; + if (abortable?.abort) { + abortable.abort(new AbortError()); + this.#logger.log(`${peerId}:: aborted channel stream`); + return; + } + + // If we cannot explicitly close/abort, log and rely on natural closure. + this.#logger.log( + `${peerId}:: channel stream lacks close/abort, relying on natural closure`, + ); + } catch (problem) { + this.#outputError(peerId, 'closing channel stream', problem); + } + } + /** * Output an error message. * diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index 03d0c3f76..df55e88e4 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -60,6 +60,7 @@ const mockReconnectionManager = { resetBackoff: vi.fn(), resetAllBackoffs: vi.fn(), clear: vi.fn(), + clearPeer: vi.fn(), }; vi.mock('./ReconnectionManager.ts', () => { @@ -81,6 +82,8 @@ vi.mock('./ReconnectionManager.ts', () => { resetAllBackoffs = mockReconnectionManager.resetAllBackoffs; clear = mockReconnectionManager.clear; + + clearPeer = mockReconnectionManager.clearPeer; } return { ReconnectionManager: MockReconnectionManager, @@ -100,6 +103,7 @@ const mockConnectionFactory = { dialIdempotent: vi.fn(), onInboundConnection: vi.fn(), stop: vi.fn().mockResolvedValue(undefined), + closeChannel: vi.fn().mockResolvedValue(undefined), }; vi.mock('./ConnectionFactory.ts', () => { @@ -183,10 +187,12 @@ describe('network.initNetwork', () => { mockReconnectionManager.resetBackoff.mockClear(); mockReconnectionManager.resetAllBackoffs.mockClear(); mockReconnectionManager.clear.mockClear(); + mockReconnectionManager.clearPeer.mockClear(); mockConnectionFactory.dialIdempotent.mockClear(); mockConnectionFactory.onInboundConnection.mockClear(); mockConnectionFactory.stop.mockClear(); + mockConnectionFactory.closeChannel.mockClear(); mockLogger.log.mockClear(); mockLogger.error.mockClear(); @@ -2146,6 +2152,101 @@ describe('network.initNetwork', () => { expect(mockMessageQueue.enqueue).toHaveBeenCalledWith('msg'); setIntervalSpy.mockRestore(); }); + + it('cleans up stale peers and calls clearPeer', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const mockChannel = createMockChannel('peer-1'); + // End the inbound stream so the channel is removed from the active channels map. + // Stale cleanup only applies when there is no active channel. + mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const stalePeerTimeoutMs = 1; + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { stalePeerTimeoutMs }, + vi.fn(), + ); + // Establish connection (sets lastConnectionTime) + await sendRemoteMessage('peer-1', 'msg'); + // Wait until readChannel processes the stream end and removes the channel. + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + }); + // Ensure enough wall-clock time passes to exceed stalePeerTimeoutMs. + await delay(stalePeerTimeoutMs + 5); + // Run cleanup; stale peer should be cleaned + intervalFn?.(); + // Verify clearPeer was called + expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); + // Verify cleanup log message + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('peer-1:: cleaning up stale peer data'), + ); + setIntervalSpy.mockRestore(); + }); + + it('respects custom cleanupIntervalMs option', async () => { + const customInterval = 30 * 60 * 1000; // 30 minutes + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((_fn: () => void, _ms?: number) => { + return 1 as unknown as NodeJS.Timeout; + }); + await initNetwork( + '0x1234', + { cleanupIntervalMs: customInterval }, + vi.fn(), + ); + expect(setIntervalSpy).toHaveBeenCalledWith( + expect.any(Function), + customInterval, + ); + setIntervalSpy.mockRestore(); + }); + + it('respects custom stalePeerTimeoutMs option', async () => { + let intervalFn: (() => void) | undefined; + const setIntervalSpy = vi + .spyOn(global, 'setInterval') + .mockImplementation((fn: () => void, _ms?: number) => { + intervalFn = fn; + return 1 as unknown as NodeJS.Timeout; + }); + const customTimeout = 50; + const mockChannel = createMockChannel('peer-1'); + // End the inbound stream so the channel is removed from the active channels map. + mockChannel.msgStream.read.mockResolvedValueOnce(undefined); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const { sendRemoteMessage } = await initNetwork( + '0x1234', + { + stalePeerTimeoutMs: customTimeout, + }, + vi.fn(), + ); + // Establish connection + await sendRemoteMessage('peer-1', 'msg'); + // Wait until readChannel processes the stream end and removes the channel. + await vi.waitFor(() => { + expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended'); + }); + // Run cleanup quickly; peer should not be stale yet. + intervalFn?.(); + // Peer should not be cleaned (not stale yet) + expect(mockReconnectionManager.clearPeer).not.toHaveBeenCalled(); + // Wait beyond the custom timeout, then run cleanup again. + await delay(customTimeout + 10); + intervalFn?.(); + // Now peer should be cleaned + expect(mockReconnectionManager.clearPeer).toHaveBeenCalledWith('peer-1'); + setIntervalSpy.mockRestore(); + }); }); describe('reconnection respects connection limit', () => { @@ -2236,6 +2337,11 @@ describe('network.initNetwork', () => { 'peer-0:: reconnection blocked by connection limit', ), ); + // Verify closeChannel was called to release network resources + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + reconnectChannel, + 'peer-0', + ); }, { timeout: 5000 }, ); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index ee77ec3d0..06337e8d2 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -326,7 +326,8 @@ export async function initNetwork( `reconnection attempt ${nextAttempt}`, limitError, ); - // Don't add channel - connection will naturally close + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, peerId); // Continue the reconnection loop continue; } @@ -575,8 +576,9 @@ export async function initNetwork( logger.log( `${targetPeerId}:: connection limit reached after dial, queueing message`, ); + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, targetPeerId); queue.enqueue(message); - // Don't add channel - connection will naturally close // Start reconnection to retry later when limit might free up handleConnectionLoss(targetPeerId); return; @@ -634,7 +636,20 @@ export async function initNetwork( logger.log( `${channel.peerId}:: rejecting inbound connection due to connection limit`, ); - // Don't add to channels map - connection will naturally close + // Explicitly close the channel to release network resources + const closePromise = connectionFactory.closeChannel( + channel, + channel.peerId, + ); + if (typeof closePromise?.catch === 'function') { + closePromise.catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel', + problem, + ); + }); + } return; } diff --git a/vitest.config.ts b/vitest.config.ts index 31579df29..63bc9472c 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.27, + statements: 96.4, functions: 98.55, - branches: 97.33, - lines: 96.27, + branches: 97.27, + lines: 96.4, }, 'packages/omnium-gatherum/**': { statements: 5.67, From a0aa335f82371fb5ff5914e778acb1e098d82d68 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:20:17 +0100 Subject: [PATCH 5/8] small refactor --- packages/ocap-kernel/src/remotes/network.ts | 62 ++++++++++++--------- vitest.config.ts | 6 +- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 06337e8d2..9be7a50a9 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -276,9 +276,7 @@ export async function initNetwork( logger.log( `${peerId}:: max reconnection attempts (${maxAttempts}) reached, giving up`, ); - reconnectionManager.stopReconnection(peerId); - queue.clear(); - onRemoteGiveUp?.(peerId); + giveUpOnPeer(peerId, queue); return; } @@ -332,17 +330,11 @@ export async function initNetwork( continue; } - // Add channel to manager - channels.set(peerId, channel); - lastConnectionTime.set(peerId, Date.now()); + // Register channel and start reading + registerChannel(peerId, channel); logger.log(`${peerId}:: reconnection successful`); - // Start reading from the new channel - readChannel(channel).catch((problem) => { - outputError(peerId, `reading channel to`, problem); - }); - // Flush queued messages await flushQueuedMessages(peerId, channel, queue); @@ -365,9 +357,7 @@ export async function initNetwork( } if (!isRetryableNetworkError(problem)) { outputError(peerId, `non-retryable failure`, problem); - reconnectionManager.stopReconnection(peerId); - queue.clear(); - onRemoteGiveUp?.(peerId); + giveUpOnPeer(peerId, queue); return; } outputError(peerId, `reconnection attempt ${nextAttempt}`, problem); @@ -461,6 +451,37 @@ export async function initNetwork( } } + /** + * Register a channel and start reading from it. + * + * @param peerId - The peer ID for the channel. + * @param channel - The channel to register. + * @param errorContext - Optional context for error messages when reading fails. + */ + function registerChannel( + peerId: string, + channel: Channel, + errorContext = 'reading channel to', + ): void { + channels.set(peerId, channel); + lastConnectionTime.set(peerId, Date.now()); + readChannel(channel).catch((problem) => { + outputError(peerId, errorContext, problem); + }); + } + + /** + * Give up on a peer after max retries or non-retryable error. + * + * @param peerId - The peer ID to give up on. + * @param queue - The message queue for the peer. + */ + function giveUpOnPeer(peerId: string, queue: MessageQueue): void { + reconnectionManager.stopReconnection(peerId); + queue.clear(); + onRemoteGiveUp?.(peerId); + } + /** * Clean up stale peer data for peers disconnected for more than 1 hour. */ @@ -584,18 +605,13 @@ export async function initNetwork( return; } - channels.set(targetPeerId, channel); - lastConnectionTime.set(targetPeerId, Date.now()); + registerChannel(targetPeerId, channel); } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); queue.enqueue(message); return; } - - readChannel(channel).catch((problem) => { - outputError(targetPeerId, `reading channel to`, problem); - }); } try { @@ -653,11 +669,7 @@ export async function initNetwork( return; } - channels.set(channel.peerId, channel); - lastConnectionTime.set(channel.peerId, Date.now()); - readChannel(channel).catch((error) => { - outputError(channel.peerId, 'error in inbound channel read', error); - }); + registerChannel(channel.peerId, channel, 'error in inbound channel read'); }); // Install wake detector to reset backoff on sleep/wake diff --git a/vitest.config.ts b/vitest.config.ts index 63bc9472c..d4cd4bcd1 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.4, - functions: 98.55, + statements: 96.41, + functions: 98.56, branches: 97.27, - lines: 96.4, + lines: 96.41, }, 'packages/omnium-gatherum/**': { statements: 5.67, From c0795cce552fd0f9dd64e08c2c0fb483f85d4bdc Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:29:20 +0100 Subject: [PATCH 6/8] update last timestamp on inbound message receipt --- packages/ocap-kernel/src/remotes/network.ts | 1 + vitest.config.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 9be7a50a9..a7dc568ab 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -218,6 +218,7 @@ export async function initNetwork( if (readBuf) { reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic await receiveMessage(channel.peerId, bufToString(readBuf.subarray())); + lastConnectionTime.set(channel.peerId, Date.now()); // update timestamp on inbound activity } else { // Stream ended (returned undefined), exit the read loop logger.log(`${channel.peerId}:: stream ended`); diff --git a/vitest.config.ts b/vitest.config.ts index d4cd4bcd1..bdf52fda0 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': { From 25e81ac51ae3e58804a28e873ec59b128d0d9cd1 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 20:47:12 +0100 Subject: [PATCH 7/8] fix yet another bug --- packages/ocap-kernel/src/remotes/network.ts | 42 ++++++++++++--------- vitest.config.ts | 6 +-- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index a7dc568ab..2bb5ea30a 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -589,24 +589,32 @@ export async function initNetwork( return; } - // Re-check connection limit after dial completes to prevent race conditions - // Multiple concurrent dials could all pass the initial check, then all add channels - try { - checkConnectionLimit(); - } catch { - // Connection limit reached - close the dialed channel and queue the message - logger.log( - `${targetPeerId}:: connection limit reached after dial, queueing message`, - ); - // Explicitly close the channel to release network resources - await connectionFactory.closeChannel(channel, targetPeerId); - queue.enqueue(message); - // Start reconnection to retry later when limit might free up - handleConnectionLoss(targetPeerId); - return; - } + // Check if a concurrent call already registered a channel for this peer + // (dialIdempotent may return the same channel due to deduplication) + const existingChannel = channels.get(targetPeerId); + if (existingChannel) { + // Another concurrent call already registered the channel, use it + channel = existingChannel; + } else { + // Re-check connection limit after dial completes to prevent race conditions + // Multiple concurrent dials could all pass the initial check, then all add channels + try { + checkConnectionLimit(); + } catch { + // Connection limit reached - close the dialed channel and queue the message + logger.log( + `${targetPeerId}:: connection limit reached after dial, queueing message`, + ); + // Explicitly close the channel to release network resources + await connectionFactory.closeChannel(channel, targetPeerId); + queue.enqueue(message); + // Start reconnection to retry later when limit might free up + handleConnectionLoss(targetPeerId); + return; + } - registerChannel(targetPeerId, channel); + registerChannel(targetPeerId, channel); + } } catch (problem) { outputError(targetPeerId, `opening connection`, problem); handleConnectionLoss(targetPeerId); diff --git a/vitest.config.ts b/vitest.config.ts index bdf52fda0..a021f2293 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.25, + branches: 93.26, lines: 97.57, }, 'packages/kernel-utils/**': { @@ -158,10 +158,10 @@ export default defineConfig({ lines: 22.22, }, 'packages/ocap-kernel/**': { - statements: 96.41, + statements: 96.42, functions: 98.56, branches: 97.27, - lines: 96.41, + lines: 96.42, }, 'packages/omnium-gatherum/**': { statements: 5.67, From 8e04986e95e00014d3fe6f4e2dcb61c10d527867 Mon Sep 17 00:00:00 2001 From: Dimitris Marlagkoutsos Date: Fri, 19 Dec 2025 21:14:18 +0100 Subject: [PATCH 8/8] Check connection limit for inbound connections --- packages/ocap-kernel/src/remotes/network.ts | 45 +++++++++++---------- vitest.config.ts | 4 +- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index 2bb5ea30a..20f20a005 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -654,28 +654,31 @@ export async function initNetwork( return; } - // Check connection limit for inbound connections - try { - checkConnectionLimit(); - } catch { - logger.log( - `${channel.peerId}:: rejecting inbound connection due to connection limit`, - ); - // Explicitly close the channel to release network resources - const closePromise = connectionFactory.closeChannel( - channel, - channel.peerId, - ); - if (typeof closePromise?.catch === 'function') { - closePromise.catch((problem) => { - outputError( - channel.peerId, - 'closing rejected inbound channel', - problem, - ); - }); + // Check connection limit for inbound connections only if no existing channel + // If a channel already exists, this is likely a reconnection and the peer already has a slot + if (!channels.has(channel.peerId)) { + try { + checkConnectionLimit(); + } catch { + logger.log( + `${channel.peerId}:: rejecting inbound connection due to connection limit`, + ); + // Explicitly close the channel to release network resources + const closePromise = connectionFactory.closeChannel( + channel, + channel.peerId, + ); + if (typeof closePromise?.catch === 'function') { + closePromise.catch((problem) => { + outputError( + channel.peerId, + 'closing rejected inbound channel', + problem, + ); + }); + } + return; } - return; } registerChannel(channel.peerId, channel, 'error in inbound channel read'); diff --git a/vitest.config.ts b/vitest.config.ts index a021f2293..01caf03e9 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -82,7 +82,7 @@ export default defineConfig({ 'packages/kernel-agents/**': { statements: 94.15, functions: 94.64, - branches: 91.39, + branches: 91.42, lines: 94.15, }, 'packages/kernel-browser-runtime/**': { @@ -130,7 +130,7 @@ export default defineConfig({ 'packages/kernel-ui/**': { statements: 97.57, functions: 97.29, - branches: 93.26, + branches: 93.25, lines: 97.57, }, 'packages/kernel-utils/**': {