Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 26 additions & 53 deletions packages/ocap-kernel/src/remotes/MessageQueue.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { describe, it, expect, beforeEach } from 'vitest';

import { MessageQueue } from './MessageQueue.ts';
import type { QueuedMessage } from './MessageQueue.ts';

describe('MessageQueue', () => {
let queue: MessageQueue;
Expand Down Expand Up @@ -34,12 +33,8 @@ describe('MessageQueue', () => {
queue.enqueue('message2');

expect(queue).toHaveLength(2);
expect(queue.messages[0]).toStrictEqual({
message: 'message1',
});
expect(queue.messages[1]).toStrictEqual({
message: 'message2',
});
expect(queue.messages[0]).toBe('message1');
expect(queue.messages[1]).toBe('message2');
});

it('drops oldest message when at capacity', () => {
Expand All @@ -55,9 +50,9 @@ describe('MessageQueue', () => {
smallQueue.enqueue('msg4');

expect(smallQueue).toHaveLength(3);
expect(smallQueue.messages[0]?.message).toBe('msg2');
expect(smallQueue.messages[1]?.message).toBe('msg3');
expect(smallQueue.messages[2]?.message).toBe('msg4');
expect(smallQueue.messages[0]).toBe('msg2');
expect(smallQueue.messages[1]).toBe('msg3');
expect(smallQueue.messages[2]).toBe('msg4');
});

it('maintains FIFO order when dropping messages', () => {
Expand All @@ -69,10 +64,7 @@ describe('MessageQueue', () => {
smallQueue.enqueue('fourth');

// Should have dropped 'first' and 'second'
expect(smallQueue.messages).toStrictEqual([
{ message: 'third' },
{ message: 'fourth' },
]);
expect(smallQueue.messages).toStrictEqual(['third', 'fourth']);
});
});

Expand All @@ -83,11 +75,9 @@ describe('MessageQueue', () => {

const dequeued = queue.dequeue();

expect(dequeued).toStrictEqual({
message: 'first',
});
expect(dequeued).toBe('first');
expect(queue).toHaveLength(1);
expect(queue.messages[0]?.message).toBe('second');
expect(queue.messages[0]).toBe('second');
});

it('returns undefined for empty queue', () => {
Expand All @@ -99,9 +89,9 @@ describe('MessageQueue', () => {
queue.enqueue('2');
queue.enqueue('3');

expect(queue.dequeue()?.message).toBe('1');
expect(queue.dequeue()?.message).toBe('2');
expect(queue.dequeue()?.message).toBe('3');
expect(queue.dequeue()).toBe('1');
expect(queue.dequeue()).toBe('2');
expect(queue.dequeue()).toBe('3');
expect(queue.dequeue()).toBeUndefined();
});
});
Expand All @@ -114,11 +104,7 @@ describe('MessageQueue', () => {

const allMessages = queue.dequeueAll();

expect(allMessages).toStrictEqual([
{ message: 'msg1' },
{ message: 'msg2' },
{ message: 'msg3' },
]);
expect(allMessages).toStrictEqual(['msg1', 'msg2', 'msg3']);
expect(queue).toHaveLength(0);
expect(queue.messages).toStrictEqual([]);
});
Expand All @@ -134,7 +120,7 @@ describe('MessageQueue', () => {
queue.enqueue('msg');

const result = queue.dequeueAll();
result.push({ message: 'extra' });
result.push('extra');

// Queue should still be empty after dequeueAll
expect(queue).toHaveLength(0);
Expand All @@ -151,8 +137,8 @@ describe('MessageQueue', () => {
queue.dropOldest();

expect(queue).toHaveLength(2);
expect(queue.messages[0]?.message).toBe('second');
expect(queue.messages[1]?.message).toBe('third');
expect(queue.messages[0]).toBe('second');
expect(queue.messages[1]).toBe('third');
});

it('handles empty queue gracefully', () => {
Expand Down Expand Up @@ -195,7 +181,7 @@ describe('MessageQueue', () => {
queue.enqueue('after');

expect(queue).toHaveLength(1);
expect(queue.messages[0]?.message).toBe('after');
expect(queue.messages[0]).toBe('after');
});
});

Expand Down Expand Up @@ -224,10 +210,7 @@ describe('MessageQueue', () => {

const { messages } = queue;

expect(messages).toStrictEqual([
{ message: 'msg1' },
{ message: 'msg2' },
]);
expect(messages).toStrictEqual(['msg1', 'msg2']);

// TypeScript enforces read-only at compile time
// At runtime, verify the array reference is the internal one
Expand All @@ -246,7 +229,7 @@ describe('MessageQueue', () => {
queue.dequeue();
const messages3 = queue.messages;
expect(messages3).toHaveLength(1);
expect(messages3[0]?.message).toBe('second');
expect(messages3[0]).toBe('second');
});
});

Expand All @@ -255,11 +238,7 @@ describe('MessageQueue', () => {
queue.enqueue('old1');
queue.enqueue('old2');

const newMessages: QueuedMessage[] = [
{ message: 'new1' },
{ message: 'new2' },
{ message: 'new3' },
];
const newMessages: string[] = ['new1', 'new2', 'new3'];

queue.replaceAll(newMessages);

Expand All @@ -277,29 +256,23 @@ describe('MessageQueue', () => {
});

it('is not affected by changes to input array', () => {
const messages: QueuedMessage[] = [{ message: 'msg1' }];
const messages: string[] = ['msg1'];

queue.replaceAll(messages);

// Modify the input array
messages.push({ message: 'msg2' });
messages[0] = { message: 'modified' };
messages.push('msg2');
messages[0] = 'modified';

// Queue should not be affected
expect(queue).toHaveLength(1);
expect(queue.messages[0]).toStrictEqual({
message: 'msg1',
});
expect(queue.messages[0]).toBe('msg1');
});

it('works when replacing with more messages than capacity', () => {
const smallQueue = new MessageQueue(2);

const messages: QueuedMessage[] = [
{ message: 'msg1' },
{ message: 'msg2' },
{ message: 'msg3' },
];
const messages: string[] = ['msg1', 'msg2', 'msg3'];

smallQueue.replaceAll(messages);

Expand All @@ -316,15 +289,15 @@ describe('MessageQueue', () => {
queue.enqueue('msg2');

const first = queue.dequeue();
expect(first?.message).toBe('msg1');
expect(first).toBe('msg1');

queue.enqueue('msg3');
queue.enqueue('msg4');

expect(queue).toHaveLength(3);

queue.dropOldest();
expect(queue.messages[0]?.message).toBe('msg3');
expect(queue.messages[0]).toBe('msg3');

const all = queue.dequeueAll();
expect(all).toHaveLength(2);
Expand Down
16 changes: 6 additions & 10 deletions packages/ocap-kernel/src/remotes/MessageQueue.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
export type QueuedMessage = {
message: string;
};

/**
* Message queue management for remote communications.
*/
export class MessageQueue {
readonly #queue: QueuedMessage[] = [];
readonly #queue: string[] = [];

readonly #maxCapacity: number;

Expand All @@ -29,15 +25,15 @@ export class MessageQueue {
if (this.#queue.length >= this.#maxCapacity) {
this.dropOldest();
}
this.#queue.push({ message });
this.#queue.push(message);
}

/**
* Remove and return the first message in the queue.
*
* @returns The first message in the queue, or undefined if the queue is empty.
*/
dequeue(): QueuedMessage | undefined {
dequeue(): string | undefined {
return this.#queue.shift();
}

Expand All @@ -46,7 +42,7 @@ export class MessageQueue {
*
* @returns All messages in the queue.
*/
dequeueAll(): QueuedMessage[] {
dequeueAll(): string[] {
const messages = [...this.#queue];
this.#queue.length = 0;
return messages;
Expand Down Expand Up @@ -80,7 +76,7 @@ export class MessageQueue {
*
* @returns A read-only view of the messages.
*/
get messages(): readonly QueuedMessage[] {
get messages(): readonly string[] {
return this.#queue;
}

Expand All @@ -89,7 +85,7 @@ export class MessageQueue {
*
* @param messages - The new messages to replace the queue with.
*/
replaceAll(messages: QueuedMessage[]): void {
replaceAll(messages: string[]): void {
this.#queue.length = 0;
this.#queue.push(...messages);
}
Expand Down
44 changes: 16 additions & 28 deletions packages/ocap-kernel/src/remotes/network.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import {
beforeAll,
} from 'vitest';

import type { QueuedMessage } from './MessageQueue.ts';

// Import the module we're testing - must be after mocks are set up
let initNetwork: typeof import('./network.ts').initNetwork;

Expand All @@ -23,7 +21,7 @@ const mockMessageQueue = {
replaceAll: vi.fn(),
clear: vi.fn(),
length: 0,
messages: [] as QueuedMessage[],
messages: [] as string[],
};

vi.mock('./MessageQueue.ts', () => {
Expand Down Expand Up @@ -609,14 +607,11 @@ describe('network.initNetwork', () => {
it('flushes queued messages after successful reconnection', async () => {
// Set up message queue with queued messages
mockMessageQueue.dequeue
.mockReturnValueOnce({ message: 'queued-1' })
.mockReturnValueOnce({ message: 'queued-2' })
.mockReturnValueOnce('queued-1')
.mockReturnValueOnce('queued-2')
.mockReturnValue(undefined);
mockMessageQueue.length = 2;
mockMessageQueue.messages = [
{ message: 'queued-1' },
{ message: 'queued-2' },
];
mockMessageQueue.messages = ['queued-1', 'queued-2'];

// Setup for reconnection scenario
const mockChannel = createMockChannel('peer-1');
Expand Down Expand Up @@ -665,16 +660,12 @@ describe('network.initNetwork', () => {
mockReconnectionManager.calculateBackoff.mockReturnValue(0); // No delay for test
// Set up message queue with multiple messages
mockMessageQueue.dequeue
.mockReturnValueOnce({ message: 'queued-1' })
.mockReturnValueOnce({ message: 'queued-2' })
.mockReturnValueOnce({ message: 'queued-3' })
.mockReturnValueOnce('queued-1')
.mockReturnValueOnce('queued-2')
.mockReturnValueOnce('queued-3')
.mockReturnValue(undefined);
mockMessageQueue.length = 3;
mockMessageQueue.messages = [
{ message: 'queued-1' },
{ message: 'queued-2' },
{ message: 'queued-3' },
];
mockMessageQueue.messages = ['queued-1', 'queued-2', 'queued-3'];
const mockChannel = createMockChannel('peer-1');
mockChannel.msgStream.write
.mockRejectedValueOnce(
Expand Down Expand Up @@ -865,10 +856,7 @@ describe('network.initNetwork', () => {

// Set up queue with messages
mockMessageQueue.length = 2;
mockMessageQueue.messages = [
{ message: 'queued-1' },
{ message: 'queued-2' },
];
mockMessageQueue.messages = ['queued-1', 'queued-2'];

await closeConnection('peer-1');

Expand Down Expand Up @@ -1317,10 +1305,10 @@ describe('network.initNetwork', () => {

// Set up queue with messages that will fail during flush
mockMessageQueue.dequeue
.mockReturnValueOnce({ message: 'queued-msg' })
.mockReturnValueOnce('queued-msg')
.mockReturnValue(undefined);
mockMessageQueue.length = 1;
mockMessageQueue.messages = [{ message: 'queued-msg' }];
mockMessageQueue.messages = ['queued-msg'];

const mockChannel = createMockChannel('peer-1');
mockChannel.msgStream.write.mockRejectedValue(
Expand Down Expand Up @@ -1369,10 +1357,10 @@ describe('network.initNetwork', () => {

// Set up queue with messages that will fail during flush
mockMessageQueue.dequeue
.mockReturnValueOnce({ message: 'queued-msg' })
.mockReturnValueOnce('queued-msg')
.mockReturnValue(undefined);
mockMessageQueue.length = 1;
mockMessageQueue.messages = [{ message: 'queued-msg' }];
mockMessageQueue.messages = ['queued-msg'];

const mockChannel = createMockChannel('peer-1');
mockChannel.msgStream.write.mockRejectedValue(
Expand Down Expand Up @@ -1440,8 +1428,8 @@ describe('network.initNetwork', () => {
mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel);
// Set up queue with messages that will be flushed during reconnection
// Each reconnection attempt will try to flush these messages, and they will fail
const queuedMsg1 = { message: 'queued-1' };
const queuedMsg2 = { message: 'queued-2' };
const queuedMsg1 = 'queued-1';
const queuedMsg2 = 'queued-2';
// dequeue should return messages for each flush attempt (each reconnection)
mockMessageQueue.dequeue.mockImplementation(() => {
// Return messages in order, then undefined
Expand Down Expand Up @@ -1654,7 +1642,7 @@ describe('network.initNetwork', () => {
(abortableDelay as ReturnType<typeof vi.fn>).mockResolvedValue(undefined);

// Set up queue with messages
const queuedMsg = { message: 'queued-msg' };
const queuedMsg = 'queued-msg';
mockMessageQueue.dequeue
.mockReturnValueOnce(queuedMsg)
.mockReturnValue(undefined);
Expand Down
Loading
Loading