From 52f2d08a8637d0beb66b5e04435b8b7bcc999f43 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Fri, 19 Dec 2025 11:31:49 -0800 Subject: [PATCH 1/2] chore: QueueMessage is just string --- .../src/remotes/MessageQueue.test.ts | 79 ++++++------------- .../ocap-kernel/src/remotes/MessageQueue.ts | 16 ++-- .../ocap-kernel/src/remotes/network.test.ts | 30 +++---- packages/ocap-kernel/src/remotes/network.ts | 13 ++- 4 files changed, 47 insertions(+), 91 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/MessageQueue.test.ts b/packages/ocap-kernel/src/remotes/MessageQueue.test.ts index c5cbac9aa..d08b46704 100644 --- a/packages/ocap-kernel/src/remotes/MessageQueue.test.ts +++ b/packages/ocap-kernel/src/remotes/MessageQueue.test.ts @@ -1,7 +1,6 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { MessageQueue } from './MessageQueue.ts'; -import type { QueuedMessage } from './MessageQueue.ts'; describe('MessageQueue', () => { let queue: MessageQueue; @@ -34,12 +33,8 @@ describe('MessageQueue', () => { queue.enqueue('message2'); expect(queue).toHaveLength(2); - expect(queue.messages[0]).toStrictEqual({ - message: 'message1', - }); - expect(queue.messages[1]).toStrictEqual({ - message: 'message2', - }); + expect(queue.messages[0]).toBe('message1'); + expect(queue.messages[1]).toBe('message2'); }); it('drops oldest message when at capacity', () => { @@ -55,9 +50,9 @@ describe('MessageQueue', () => { smallQueue.enqueue('msg4'); expect(smallQueue).toHaveLength(3); - expect(smallQueue.messages[0]?.message).toBe('msg2'); - expect(smallQueue.messages[1]?.message).toBe('msg3'); - expect(smallQueue.messages[2]?.message).toBe('msg4'); + expect(smallQueue.messages[0]).toBe('msg2'); + expect(smallQueue.messages[1]).toBe('msg3'); + expect(smallQueue.messages[2]).toBe('msg4'); }); it('maintains FIFO order when dropping messages', () => { @@ -69,10 +64,7 @@ describe('MessageQueue', () => { smallQueue.enqueue('fourth'); // Should have dropped 'first' and 'second' - expect(smallQueue.messages).toStrictEqual([ - { message: 'third' }, - { message: 'fourth' }, - ]); + expect(smallQueue.messages).toStrictEqual(['third', 'fourth']); }); }); @@ -83,11 +75,9 @@ describe('MessageQueue', () => { const dequeued = queue.dequeue(); - expect(dequeued).toStrictEqual({ - message: 'first', - }); + expect(dequeued).toBe('first'); expect(queue).toHaveLength(1); - expect(queue.messages[0]?.message).toBe('second'); + expect(queue.messages[0]).toBe('second'); }); it('returns undefined for empty queue', () => { @@ -99,9 +89,9 @@ describe('MessageQueue', () => { queue.enqueue('2'); queue.enqueue('3'); - expect(queue.dequeue()?.message).toBe('1'); - expect(queue.dequeue()?.message).toBe('2'); - expect(queue.dequeue()?.message).toBe('3'); + expect(queue.dequeue()).toBe('1'); + expect(queue.dequeue()).toBe('2'); + expect(queue.dequeue()).toBe('3'); expect(queue.dequeue()).toBeUndefined(); }); }); @@ -114,11 +104,7 @@ describe('MessageQueue', () => { const allMessages = queue.dequeueAll(); - expect(allMessages).toStrictEqual([ - { message: 'msg1' }, - { message: 'msg2' }, - { message: 'msg3' }, - ]); + expect(allMessages).toStrictEqual(['msg1', 'msg2', 'msg3']); expect(queue).toHaveLength(0); expect(queue.messages).toStrictEqual([]); }); @@ -134,7 +120,7 @@ describe('MessageQueue', () => { queue.enqueue('msg'); const result = queue.dequeueAll(); - result.push({ message: 'extra' }); + result.push('extra'); // Queue should still be empty after dequeueAll expect(queue).toHaveLength(0); @@ -151,8 +137,8 @@ describe('MessageQueue', () => { queue.dropOldest(); expect(queue).toHaveLength(2); - expect(queue.messages[0]?.message).toBe('second'); - expect(queue.messages[1]?.message).toBe('third'); + expect(queue.messages[0]).toBe('second'); + expect(queue.messages[1]).toBe('third'); }); it('handles empty queue gracefully', () => { @@ -195,7 +181,7 @@ describe('MessageQueue', () => { queue.enqueue('after'); expect(queue).toHaveLength(1); - expect(queue.messages[0]?.message).toBe('after'); + expect(queue.messages[0]).toBe('after'); }); }); @@ -224,10 +210,7 @@ describe('MessageQueue', () => { const { messages } = queue; - expect(messages).toStrictEqual([ - { message: 'msg1' }, - { message: 'msg2' }, - ]); + expect(messages).toStrictEqual(['msg1', 'msg2']); // TypeScript enforces read-only at compile time // At runtime, verify the array reference is the internal one @@ -246,7 +229,7 @@ describe('MessageQueue', () => { queue.dequeue(); const messages3 = queue.messages; expect(messages3).toHaveLength(1); - expect(messages3[0]?.message).toBe('second'); + expect(messages3[0]).toBe('second'); }); }); @@ -255,11 +238,7 @@ describe('MessageQueue', () => { queue.enqueue('old1'); queue.enqueue('old2'); - const newMessages: QueuedMessage[] = [ - { message: 'new1' }, - { message: 'new2' }, - { message: 'new3' }, - ]; + const newMessages: string[] = ['new1', 'new2', 'new3']; queue.replaceAll(newMessages); @@ -277,29 +256,23 @@ describe('MessageQueue', () => { }); it('is not affected by changes to input array', () => { - const messages: QueuedMessage[] = [{ message: 'msg1' }]; + const messages: string[] = ['msg1']; queue.replaceAll(messages); // Modify the input array - messages.push({ message: 'msg2' }); - messages[0] = { message: 'modified' }; + messages.push('msg2'); + messages[0] = 'modified'; // Queue should not be affected expect(queue).toHaveLength(1); - expect(queue.messages[0]).toStrictEqual({ - message: 'msg1', - }); + expect(queue.messages[0]).toBe('msg1'); }); it('works when replacing with more messages than capacity', () => { const smallQueue = new MessageQueue(2); - const messages: QueuedMessage[] = [ - { message: 'msg1' }, - { message: 'msg2' }, - { message: 'msg3' }, - ]; + const messages: string[] = ['msg1', 'msg2', 'msg3']; smallQueue.replaceAll(messages); @@ -316,7 +289,7 @@ describe('MessageQueue', () => { queue.enqueue('msg2'); const first = queue.dequeue(); - expect(first?.message).toBe('msg1'); + expect(first).toBe('msg1'); queue.enqueue('msg3'); queue.enqueue('msg4'); @@ -324,7 +297,7 @@ describe('MessageQueue', () => { expect(queue).toHaveLength(3); queue.dropOldest(); - expect(queue.messages[0]?.message).toBe('msg3'); + expect(queue.messages[0]).toBe('msg3'); const all = queue.dequeueAll(); expect(all).toHaveLength(2); diff --git a/packages/ocap-kernel/src/remotes/MessageQueue.ts b/packages/ocap-kernel/src/remotes/MessageQueue.ts index e42a734db..d8d763add 100644 --- a/packages/ocap-kernel/src/remotes/MessageQueue.ts +++ b/packages/ocap-kernel/src/remotes/MessageQueue.ts @@ -1,12 +1,8 @@ -export type QueuedMessage = { - message: string; -}; - /** * Message queue management for remote communications. */ export class MessageQueue { - readonly #queue: QueuedMessage[] = []; + readonly #queue: string[] = []; readonly #maxCapacity: number; @@ -29,7 +25,7 @@ export class MessageQueue { if (this.#queue.length >= this.#maxCapacity) { this.dropOldest(); } - this.#queue.push({ message }); + this.#queue.push(message); } /** @@ -37,7 +33,7 @@ export class MessageQueue { * * @returns The first message in the queue, or undefined if the queue is empty. */ - dequeue(): QueuedMessage | undefined { + dequeue(): string | undefined { return this.#queue.shift(); } @@ -46,7 +42,7 @@ export class MessageQueue { * * @returns All messages in the queue. */ - dequeueAll(): QueuedMessage[] { + dequeueAll(): string[] { const messages = [...this.#queue]; this.#queue.length = 0; return messages; @@ -80,7 +76,7 @@ export class MessageQueue { * * @returns A read-only view of the messages. */ - get messages(): readonly QueuedMessage[] { + get messages(): readonly string[] { return this.#queue; } @@ -89,7 +85,7 @@ export class MessageQueue { * * @param messages - The new messages to replace the queue with. */ - replaceAll(messages: QueuedMessage[]): void { + replaceAll(messages: string[]): void { this.#queue.length = 0; this.#queue.push(...messages); } diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index 5b99c16ab..4366e8eac 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -10,8 +10,6 @@ import { beforeAll, } from 'vitest'; -import type { QueuedMessage } from './MessageQueue.ts'; - // Import the module we're testing - must be after mocks are set up let initNetwork: typeof import('./network.ts').initNetwork; @@ -23,7 +21,7 @@ const mockMessageQueue = { replaceAll: vi.fn(), clear: vi.fn(), length: 0, - messages: [] as QueuedMessage[], + messages: [] as string[], }; vi.mock('./MessageQueue.ts', () => { @@ -613,10 +611,7 @@ describe('network.initNetwork', () => { .mockReturnValueOnce({ message: 'queued-2' }) .mockReturnValue(undefined); mockMessageQueue.length = 2; - mockMessageQueue.messages = [ - { message: 'queued-1' }, - { message: 'queued-2' }, - ]; + mockMessageQueue.messages = ['queued-1', 'queued-2']; // Setup for reconnection scenario const mockChannel = createMockChannel('peer-1'); @@ -670,11 +665,7 @@ describe('network.initNetwork', () => { .mockReturnValueOnce({ message: 'queued-3' }) .mockReturnValue(undefined); mockMessageQueue.length = 3; - mockMessageQueue.messages = [ - { message: 'queued-1' }, - { message: 'queued-2' }, - { message: 'queued-3' }, - ]; + mockMessageQueue.messages = ['queued-1', 'queued-2', 'queued-3']; const mockChannel = createMockChannel('peer-1'); mockChannel.msgStream.write .mockRejectedValueOnce( @@ -865,10 +856,7 @@ describe('network.initNetwork', () => { // Set up queue with messages mockMessageQueue.length = 2; - mockMessageQueue.messages = [ - { message: 'queued-1' }, - { message: 'queued-2' }, - ]; + mockMessageQueue.messages = ['queued-1', 'queued-2']; await closeConnection('peer-1'); @@ -1320,7 +1308,7 @@ describe('network.initNetwork', () => { .mockReturnValueOnce({ message: 'queued-msg' }) .mockReturnValue(undefined); mockMessageQueue.length = 1; - mockMessageQueue.messages = [{ message: 'queued-msg' }]; + mockMessageQueue.messages = ['queued-msg']; const mockChannel = createMockChannel('peer-1'); mockChannel.msgStream.write.mockRejectedValue( @@ -1372,7 +1360,7 @@ describe('network.initNetwork', () => { .mockReturnValueOnce({ message: 'queued-msg' }) .mockReturnValue(undefined); mockMessageQueue.length = 1; - mockMessageQueue.messages = [{ message: 'queued-msg' }]; + mockMessageQueue.messages = ['queued-msg']; const mockChannel = createMockChannel('peer-1'); mockChannel.msgStream.write.mockRejectedValue( @@ -1440,8 +1428,8 @@ describe('network.initNetwork', () => { mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); // Set up queue with messages that will be flushed during reconnection // Each reconnection attempt will try to flush these messages, and they will fail - const queuedMsg1 = { message: 'queued-1' }; - const queuedMsg2 = { message: 'queued-2' }; + const queuedMsg1 = 'queued-1'; + const queuedMsg2 = 'queued-2'; // dequeue should return messages for each flush attempt (each reconnection) mockMessageQueue.dequeue.mockImplementation(() => { // Return messages in order, then undefined @@ -1654,7 +1642,7 @@ describe('network.initNetwork', () => { (abortableDelay as ReturnType).mockResolvedValue(undefined); // Set up queue with messages - const queuedMsg = { message: 'queued-msg' }; + const queuedMsg = 'queued-msg'; mockMessageQueue.dequeue .mockReturnValueOnce(queuedMsg) .mockReturnValue(undefined); diff --git a/packages/ocap-kernel/src/remotes/network.ts b/packages/ocap-kernel/src/remotes/network.ts index cbe315af9..fb6cebadf 100644 --- a/packages/ocap-kernel/src/remotes/network.ts +++ b/packages/ocap-kernel/src/remotes/network.ts @@ -9,7 +9,6 @@ import { toString as bufToString, fromString } from 'uint8arrays'; import { ConnectionFactory } from './ConnectionFactory.ts'; import { MessageQueue } from './MessageQueue.ts'; -import type { QueuedMessage } from './MessageQueue.ts'; import { ReconnectionManager } from './ReconnectionManager.ts'; import type { RemoteMessageHandler, @@ -143,7 +142,7 @@ export async function initNetwork( * @param from - The peer ID that the message is from. * @param message - The message to receive. */ - async function receiveMsg(from: string, message: string): Promise { + async function receiveMessage(from: string, message: string): Promise { logger.log(`${from}:: recv ${message}`); await remoteMessageHandler(from, message); } @@ -191,7 +190,7 @@ export async function initNetwork( } if (readBuf) { reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic - await receiveMsg(channel.peerId, bufToString(readBuf.subarray())); + await receiveMessage(channel.peerId, bufToString(readBuf.subarray())); } else { // Stream ended (returned undefined), exit the read loop logger.log(`${channel.peerId}:: stream ended`); @@ -346,13 +345,13 @@ export async function initNetwork( logger.log(`${peerId}:: flushing ${queue.length} queued messages`); // Process queued messages - const failedMessages: QueuedMessage[] = []; - let queuedMsg: QueuedMessage | undefined; + const failedMessages: string[] = []; + let queuedMsg: string | undefined; while ((queuedMsg = queue.dequeue()) !== undefined) { try { - logger.log(`${peerId}:: send (queued) ${queuedMsg.message}`); - await writeWithTimeout(channel, fromString(queuedMsg.message), 10_000); + logger.log(`${peerId}:: send (queued) ${queuedMsg}`); + await writeWithTimeout(channel, fromString(queuedMsg), 10_000); } catch (problem) { outputError(peerId, `sending queued message`, problem); // Preserve the failed message and all remaining messages From a5ca87358c8f28770db12f3f9a8f216120fb6df0 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Fri, 19 Dec 2025 11:41:26 -0800 Subject: [PATCH 2/2] Grrrr --- packages/ocap-kernel/src/remotes/network.test.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/network.test.ts b/packages/ocap-kernel/src/remotes/network.test.ts index 4366e8eac..d051f9528 100644 --- a/packages/ocap-kernel/src/remotes/network.test.ts +++ b/packages/ocap-kernel/src/remotes/network.test.ts @@ -607,8 +607,8 @@ describe('network.initNetwork', () => { it('flushes queued messages after successful reconnection', async () => { // Set up message queue with queued messages mockMessageQueue.dequeue - .mockReturnValueOnce({ message: 'queued-1' }) - .mockReturnValueOnce({ message: 'queued-2' }) + .mockReturnValueOnce('queued-1') + .mockReturnValueOnce('queued-2') .mockReturnValue(undefined); mockMessageQueue.length = 2; mockMessageQueue.messages = ['queued-1', 'queued-2']; @@ -660,9 +660,9 @@ describe('network.initNetwork', () => { mockReconnectionManager.calculateBackoff.mockReturnValue(0); // No delay for test // Set up message queue with multiple messages mockMessageQueue.dequeue - .mockReturnValueOnce({ message: 'queued-1' }) - .mockReturnValueOnce({ message: 'queued-2' }) - .mockReturnValueOnce({ message: 'queued-3' }) + .mockReturnValueOnce('queued-1') + .mockReturnValueOnce('queued-2') + .mockReturnValueOnce('queued-3') .mockReturnValue(undefined); mockMessageQueue.length = 3; mockMessageQueue.messages = ['queued-1', 'queued-2', 'queued-3']; @@ -1305,7 +1305,7 @@ describe('network.initNetwork', () => { // Set up queue with messages that will fail during flush mockMessageQueue.dequeue - .mockReturnValueOnce({ message: 'queued-msg' }) + .mockReturnValueOnce('queued-msg') .mockReturnValue(undefined); mockMessageQueue.length = 1; mockMessageQueue.messages = ['queued-msg']; @@ -1357,7 +1357,7 @@ describe('network.initNetwork', () => { // Set up queue with messages that will fail during flush mockMessageQueue.dequeue - .mockReturnValueOnce({ message: 'queued-msg' }) + .mockReturnValueOnce('queued-msg') .mockReturnValue(undefined); mockMessageQueue.length = 1; mockMessageQueue.messages = ['queued-msg'];