diff --git a/packages/plugin-bridge/src/channel/factory.ts b/packages/plugin-bridge/src/channel/factory.ts index aa208833..38c68b5d 100644 --- a/packages/plugin-bridge/src/channel/factory.ts +++ b/packages/plugin-bridge/src/channel/factory.ts @@ -25,3 +25,8 @@ export const getChannel = async (): Promise => { throw error; } }; + +export const isLeader = (): boolean => { + // DevTools UI (panel) is the leader and initiates handshake + return '__ROZENITE_PANEL__' in window; +}; diff --git a/packages/plugin-bridge/src/index.ts b/packages/plugin-bridge/src/index.ts index b4a094d6..6a949774 100644 --- a/packages/plugin-bridge/src/index.ts +++ b/packages/plugin-bridge/src/index.ts @@ -4,3 +4,6 @@ export type { Subscription } from './types'; export type { UseRozeniteDevToolsClientOptions } from './useRozeniteDevToolsClient'; export { getRozeniteDevToolsClient } from './client'; export { UnsupportedPlatformError } from './errors'; + +// v2 API (buffered, with handshake) +export * as unstable from './v2/index.js'; diff --git a/packages/plugin-bridge/src/v2/__tests__/client.test.ts b/packages/plugin-bridge/src/v2/__tests__/client.test.ts new file mode 100644 index 00000000..57d6e6d9 --- /dev/null +++ b/packages/plugin-bridge/src/v2/__tests__/client.test.ts @@ -0,0 +1,647 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { createClient } from '../client/factory.js'; +import { RozeniteDevToolsClient } from '../client/types.js'; +import { UserMessage } from '../connection/types.js'; +import { + createMockChannelPair, + wait, + waitForBothReady, + MockChannel, +} from '../test-utils/index.js'; + +type TestEventMap = { + 'test-event': { message: string }; + 'another-event': { count: number }; + 'ping': { id: number }; + 'pong': { id: number }; + 'batch': { index: number }; +}; + +describe('Plugin Bridge v2 - Client E2E Tests', () => { + let deviceChannel: MockChannel; + let panelChannel: MockChannel; + + beforeEach(() => { + [deviceChannel, panelChannel] = createMockChannelPair(); + }); + + describe('Basic messaging', () => { + it('should complete handshake automatically', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + await waitForBothReady(deviceClient, panelClient); + + expect(deviceClient.isReady()).toBe(true); + expect(panelClient.isReady()).toBe(true); + + deviceClient.close(); + panelClient.close(); + }); + + it('should send and receive messages with timestamp and data', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + const deviceMessages: UserMessage<{ message: string }>[] = []; + const panelMessages: UserMessage<{ message: string }>[] = []; + + deviceClient.onMessage('test-event', (msg) => { + deviceMessages.push(msg); + }); + + panelClient.onMessage('test-event', (msg) => { + panelMessages.push(msg); + }); + + await waitForBothReady(deviceClient, panelClient); + + const beforeSend = Date.now(); + panelClient.send('test-event', { message: 'from panel' }); + deviceClient.send('test-event', { message: 'from device' }); + + await wait(50); + + expect(deviceMessages).toHaveLength(1); + expect(deviceMessages[0].data).toEqual({ message: 'from panel' }); + expect(deviceMessages[0].timestamp).toBeGreaterThanOrEqual(beforeSend); + expect(deviceMessages[0].type).toBe('test-event'); + + expect(panelMessages).toHaveLength(1); + expect(panelMessages[0].data).toEqual({ message: 'from device' }); + expect(panelMessages[0].timestamp).toBeGreaterThanOrEqual(beforeSend); + + deviceClient.close(); + panelClient.close(); + }); + + it('should filter messages by type', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + const testEvents: UserMessage[] = []; + const anotherEvents: UserMessage[] = []; + + deviceClient.onMessage('test-event', (msg) => testEvents.push(msg)); + deviceClient.onMessage('another-event', (msg) => anotherEvents.push(msg)); + panelClient.onMessage('test-event', () => {}); + + await waitForBothReady(deviceClient, panelClient); + + panelClient.send('test-event', { message: 'hello' }); + panelClient.send('another-event', { count: 42 }); + panelClient.send('test-event', { message: 'world' }); + + await wait(50); + + expect(testEvents).toHaveLength(2); + expect(testEvents[0].data).toEqual({ message: 'hello' }); + expect(testEvents[1].data).toEqual({ message: 'world' }); + + expect(anotherEvents).toHaveLength(1); + expect(anotherEvents[0].data).toEqual({ count: 42 }); + + deviceClient.close(); + panelClient.close(); + }); + }); + + describe('Per-type buffering and replay', () => { + it('should buffer messages until handler is registered', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + // Panel registers handler to trigger handshake on that side + panelClient.onMessage('test-event', () => {}); + + await waitForBothReady(deviceClient, panelClient); + + // Send messages BEFORE device registers handler + panelClient.send('test-event', { message: 'early 1' }); + panelClient.send('test-event', { message: 'early 2' }); + + await wait(50); + + // Now register handler - should get replay + const receivedMessages: UserMessage<{ message: string }>[] = []; + deviceClient.onMessage('test-event', (msg) => { + receivedMessages.push(msg); + }); + + await wait(50); + + expect(receivedMessages).toHaveLength(2); + expect(receivedMessages[0].data).toEqual({ message: 'early 1' }); + expect(receivedMessages[1].data).toEqual({ message: 'early 2' }); + + deviceClient.close(); + panelClient.close(); + }); + + it('should replay only to first handler, not subsequent ones', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + panelClient.onMessage('test-event', () => {}); + await waitForBothReady(deviceClient, panelClient); + + // Send before any device handler + panelClient.send('test-event', { message: 'buffered' }); + await wait(50); + + const handler1Messages: UserMessage[] = []; + const handler2Messages: UserMessage[] = []; + + // First handler gets replay + deviceClient.onMessage('test-event', (msg) => handler1Messages.push(msg)); + await wait(50); + + // Second handler does NOT get replay + deviceClient.onMessage('test-event', (msg) => handler2Messages.push(msg)); + await wait(50); + + expect(handler1Messages).toHaveLength(1); + expect(handler2Messages).toHaveLength(0); + + // New message goes to both + panelClient.send('test-event', { message: 'new' }); + await wait(50); + + expect(handler1Messages).toHaveLength(2); + expect(handler2Messages).toHaveLength(1); + + deviceClient.close(); + panelClient.close(); + }); + + it('should buffer different message types independently', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + panelClient.onMessage('test-event', () => {}); + panelClient.onMessage('another-event', () => {}); + await waitForBothReady(deviceClient, panelClient); + + // Send different types before device handlers + panelClient.send('test-event', { message: 'test msg' }); + panelClient.send('another-event', { count: 42 }); + await wait(50); + + const testMessages: UserMessage[] = []; + const anotherMessages: UserMessage[] = []; + + // Register test-event handler - should only replay test-event + deviceClient.onMessage('test-event', (msg) => testMessages.push(msg)); + await wait(50); + + expect(testMessages).toHaveLength(1); + expect(anotherMessages).toHaveLength(0); + + // Register another-event handler - should only replay another-event + deviceClient.onMessage('another-event', (msg) => anotherMessages.push(msg)); + await wait(50); + + expect(testMessages).toHaveLength(1); + expect(anotherMessages).toHaveLength(1); + + deviceClient.close(); + panelClient.close(); + }); + }); + + describe('Handshake protocol', () => { + it('should follow INIT -> ACK -> COMPLETE sequence', async () => { + const handshakeSequence: string[] = []; + + const originalDeviceSend = deviceChannel.send.bind(deviceChannel); + const originalPanelSend = panelChannel.send.bind(panelChannel); + + deviceChannel.send = vi.fn((message: any) => { + if (message.type?.startsWith('__HANDSHAKE_')) { + handshakeSequence.push(`device:${message.type}`); + } + originalDeviceSend(message); + }); + + panelChannel.send = vi.fn((message: any) => { + if (message.type?.startsWith('__HANDSHAKE_')) { + handshakeSequence.push(`panel:${message.type}`); + } + originalPanelSend(message); + }); + + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + await waitForBothReady(deviceClient, panelClient); + + expect(handshakeSequence).toEqual([ + 'panel:__HANDSHAKE_INIT__', + 'device:__HANDSHAKE_ACK__', + 'panel:__HANDSHAKE_COMPLETE__', + ]); + + deviceClient.close(); + panelClient.close(); + }); + + it('should isolate messages by pluginId', async () => { + const plugin1Device = await createClient({ + pluginId: 'plugin-1', + channel: deviceChannel, + isLeader: false, + }); + + const plugin1Panel = await createClient({ + pluginId: 'plugin-1', + channel: panelChannel, + isLeader: true, + }); + + const plugin2Device = await createClient({ + pluginId: 'plugin-2', + channel: deviceChannel, + isLeader: false, + }); + + const plugin2Panel = await createClient({ + pluginId: 'plugin-2', + channel: panelChannel, + isLeader: true, + }); + + const plugin1Messages: UserMessage[] = []; + const plugin2Messages: UserMessage[] = []; + + plugin1Device.onMessage('test-event', (msg) => plugin1Messages.push(msg)); + plugin2Device.onMessage('test-event', (msg) => plugin2Messages.push(msg)); + plugin1Panel.onMessage('test-event', () => {}); + plugin2Panel.onMessage('test-event', () => {}); + + await waitForBothReady(plugin1Device, plugin1Panel); + await waitForBothReady(plugin2Device, plugin2Panel); + + plugin1Panel.send('test-event', { message: 'for plugin 1' }); + plugin2Panel.send('test-event', { message: 'for plugin 2' }); + + await wait(50); + + expect(plugin1Messages).toHaveLength(1); + expect(plugin1Messages[0].data).toEqual({ message: 'for plugin 1' }); + + expect(plugin2Messages).toHaveLength(1); + expect(plugin2Messages[0].data).toEqual({ message: 'for plugin 2' }); + + plugin1Device.close(); + plugin1Panel.close(); + plugin2Device.close(); + plugin2Panel.close(); + }); + }); + + describe('Reconnection', () => { + it('should handle DevTools UI reload', async () => { + const [panelChannel, deviceChannel] = createMockChannelPair(); + + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient1 = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + panelClient1.onMessage('test-event', () => {}); + deviceClient.onMessage('test-event', () => {}); + + await waitForBothReady(deviceClient, panelClient1); + + // Close first panel (simulate reload) + panelClient1.close(); + + // New panel connects + const panelClient2 = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + const messages: UserMessage[] = []; + panelClient2.onMessage('test-event', (msg) => messages.push(msg)); + + await waitForBothReady(deviceClient, panelClient2); + + deviceClient.send('test-event', { message: 'after reload' }); + await wait(50); + + expect(messages).toHaveLength(1); + expect(messages[0].data).toEqual({ message: 'after reload' }); + + deviceClient.close(); + panelClient2.close(); + }); + }); + + describe('High-volume messaging', () => { + it('should handle large batches of messages', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + panelClient.onMessage('batch', () => {}); + + await waitForBothReady(deviceClient, panelClient); + + const MESSAGE_COUNT = 100; + const receivedMessages: UserMessage<{ index: number }>[] = []; + + deviceClient.onMessage('batch', (msg) => receivedMessages.push(msg)); + + for (let i = 0; i < MESSAGE_COUNT; i++) { + panelClient.send('batch', { index: i }); + } + + await wait(200); + + expect(receivedMessages).toHaveLength(MESSAGE_COUNT); + + // Verify order is preserved + for (let i = 0; i < MESSAGE_COUNT; i++) { + expect(receivedMessages[i].data.index).toBe(i); + } + + deviceClient.close(); + panelClient.close(); + }); + + it('should respect buffer limits', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + buffer: { maxPerType: 10 }, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + panelClient.onMessage('batch', () => {}); + await waitForBothReady(deviceClient, panelClient); + + // Send 50 messages before handler is registered on device + for (let i = 0; i < 50; i++) { + panelClient.send('batch', { index: i }); + } + + await wait(100); + + // Now register handler + const receivedMessages: UserMessage<{ index: number }>[] = []; + deviceClient.onMessage('batch', (msg) => receivedMessages.push(msg)); + + await wait(50); + + // Should only have last 10 (maxPerType) + expect(receivedMessages.length).toBeLessThanOrEqual(10); + + deviceClient.close(); + panelClient.close(); + }); + }); + + describe('Bidirectional communication', () => { + it('should support ping-pong pattern', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + // Device responds to ping with pong + deviceClient.onMessage('ping', (msg) => { + deviceClient.send('pong', { id: msg.data.id }); + }); + + panelClient.onMessage('ping', () => {}); + + await waitForBothReady(deviceClient, panelClient); + + const pongReceived = new Promise((resolve) => { + panelClient.onMessage('pong', (msg) => resolve(msg.data.id)); + }); + + panelClient.send('ping', { id: 42 }); + + const result = await pongReceived; + expect(result).toBe(42); + + deviceClient.close(); + panelClient.close(); + }); + }); + + describe('Handler management', () => { + it('should handle multiple handlers for the same type', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + const handler1Messages: UserMessage[] = []; + const handler2Messages: UserMessage[] = []; + + deviceClient.onMessage('test-event', (msg) => handler1Messages.push(msg)); + deviceClient.onMessage('test-event', (msg) => handler2Messages.push(msg)); + panelClient.onMessage('test-event', () => {}); + + await waitForBothReady(deviceClient, panelClient); + + panelClient.send('test-event', { message: 'broadcast' }); + await wait(50); + + expect(handler1Messages).toHaveLength(1); + expect(handler2Messages).toHaveLength(1); + + deviceClient.close(); + panelClient.close(); + }); + + it('should handle listener removal', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + const messages: UserMessage[] = []; + + const subscription = deviceClient.onMessage('test-event', (msg) => { + messages.push(msg); + }); + + panelClient.onMessage('test-event', () => {}); + + await waitForBothReady(deviceClient, panelClient); + + panelClient.send('test-event', { message: 'first' }); + await wait(50); + + subscription.remove(); + + panelClient.send('test-event', { message: 'second' }); + await wait(50); + + expect(messages).toHaveLength(1); + + deviceClient.close(); + panelClient.close(); + }); + }); + + describe('Edge cases', () => { + it('should handle close gracefully', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + deviceClient.close(); + panelClient.close(); + + expect(deviceClient.isReady()).toBe(false); + }); + + it('should handle onReady callback when already ready', async () => { + const deviceClient = await createClient({ + pluginId: 'test-plugin', + channel: deviceChannel, + isLeader: false, + }); + + const panelClient = await createClient({ + pluginId: 'test-plugin', + channel: panelChannel, + isLeader: true, + }); + + deviceClient.onMessage('test-event', () => {}); + panelClient.onMessage('test-event', () => {}); + + await waitForBothReady(deviceClient, panelClient); + + const readyCallback = vi.fn(); + deviceClient.onReady(readyCallback); + + await wait(20); + + expect(readyCallback).toHaveBeenCalledTimes(1); + + deviceClient.close(); + panelClient.close(); + }); + }); +}); diff --git a/packages/plugin-bridge/src/v2/__tests__/connection.test.ts b/packages/plugin-bridge/src/v2/__tests__/connection.test.ts new file mode 100644 index 00000000..271a7f5e --- /dev/null +++ b/packages/plugin-bridge/src/v2/__tests__/connection.test.ts @@ -0,0 +1,441 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { createHandshakeConnection } from '../connection/handshake-connection.js'; +import { createBufferedConnection } from '../connection/buffered-connection.js'; +import { + HANDSHAKE_INIT, + HANDSHAKE_ACK, + HANDSHAKE_COMPLETE, + WireMessage, +} from '../connection/types.js'; +import { + createMockChannelPair, + wait, + waitFor, + MockChannel, +} from '../test-utils/index.js'; + +const createWireMessage = (pluginId: string, type: string, data: unknown): WireMessage => ({ + pluginId, + type, + timestamp: Date.now(), + data, +}); + +describe('Connection Layer', () => { + let channelA: MockChannel; + let channelB: MockChannel; + + beforeEach(() => { + [channelA, channelB] = createMockChannelPair(); + }); + + describe('HandshakeConnection', () => { + describe('Handshake Protocol', () => { + it('should complete handshake with INIT -> ACK -> COMPLETE sequence', async () => { + const handshakeMessages: string[] = []; + + const originalSendA = channelA.send.bind(channelA); + const originalSendB = channelB.send.bind(channelB); + + channelA.send = vi.fn((msg: unknown) => { + const message = msg as Record; + if (typeof message.type === 'string' && message.type.startsWith('__HANDSHAKE_')) { + handshakeMessages.push(`A:${message.type}`); + } + originalSendA(msg); + }); + + channelB.send = vi.fn((msg: unknown) => { + const message = msg as Record; + if (typeof message.type === 'string' && message.type.startsWith('__HANDSHAKE_')) { + handshakeMessages.push(`B:${message.type}`); + } + originalSendB(msg); + }); + + const leaderConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + await waitFor(() => leaderConnection.isReady() && followerConnection.isReady()); + + expect(handshakeMessages).toEqual([ + `A:${HANDSHAKE_INIT}`, + `B:${HANDSHAKE_ACK}`, + `A:${HANDSHAKE_COMPLETE}`, + ]); + + leaderConnection.close(); + followerConnection.close(); + }); + + it('should report isReady() correctly throughout handshake', async () => { + const leaderConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + expect(leaderConnection.isReady()).toBe(false); + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + expect(followerConnection.isReady()).toBe(false); + + await waitFor(() => leaderConnection.isReady() && followerConnection.isReady()); + + expect(leaderConnection.isReady()).toBe(true); + expect(followerConnection.isReady()).toBe(true); + + leaderConnection.close(); + followerConnection.close(); + }); + + it('should call onReady callbacks when handshake completes', async () => { + const leaderReadyCallback = vi.fn(); + const followerReadyCallback = vi.fn(); + + const leaderConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + leaderConnection.onReady(leaderReadyCallback); + followerConnection.onReady(followerReadyCallback); + + await waitFor(() => leaderConnection.isReady() && followerConnection.isReady()); + await wait(10); + + expect(leaderReadyCallback).toHaveBeenCalledTimes(1); + expect(followerReadyCallback).toHaveBeenCalledTimes(1); + + leaderConnection.close(); + followerConnection.close(); + }); + + it('should call onReady immediately if already ready', async () => { + const leaderConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + await waitFor(() => leaderConnection.isReady()); + + const lateCallback = vi.fn(); + leaderConnection.onReady(lateCallback); + + await wait(10); + expect(lateCallback).toHaveBeenCalledTimes(1); + + leaderConnection.close(); + followerConnection.close(); + }); + }); + + describe('Message Routing', () => { + it('should forward wire messages to listeners', async () => { + const leaderConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + const receivedMessages: WireMessage[] = []; + followerConnection.onMessage((msg) => { + receivedMessages.push(msg as WireMessage); + }); + + await waitFor(() => leaderConnection.isReady() && followerConnection.isReady()); + + const wireMsg = createWireMessage('test', 'test-event', { hello: 'world' }); + leaderConnection.send(wireMsg); + + await wait(20); + + expect(receivedMessages).toHaveLength(1); + expect(receivedMessages[0].type).toBe('test-event'); + expect(receivedMessages[0].data).toEqual({ hello: 'world' }); + expect(receivedMessages[0].timestamp).toBeDefined(); + + leaderConnection.close(); + followerConnection.close(); + }); + + it('should filter messages by pluginId', async () => { + const connectionA1 = createHandshakeConnection(channelA, { + pluginId: 'plugin-1', + isLeader: true, + }); + + const connectionB1 = createHandshakeConnection(channelB, { + pluginId: 'plugin-1', + isLeader: false, + }); + + const connectionA2 = createHandshakeConnection(channelA, { + pluginId: 'plugin-2', + isLeader: true, + }); + + const connectionB2 = createHandshakeConnection(channelB, { + pluginId: 'plugin-2', + isLeader: false, + }); + + const plugin1Messages: WireMessage[] = []; + const plugin2Messages: WireMessage[] = []; + + connectionB1.onMessage((msg) => plugin1Messages.push(msg as WireMessage)); + connectionB2.onMessage((msg) => plugin2Messages.push(msg as WireMessage)); + + await waitFor(() => connectionA1.isReady() && connectionB1.isReady()); + await waitFor(() => connectionA2.isReady() && connectionB2.isReady()); + + connectionA1.send(createWireMessage('plugin-1', 'event', 'for plugin 1')); + connectionA2.send(createWireMessage('plugin-2', 'event', 'for plugin 2')); + + await wait(20); + + expect(plugin1Messages).toHaveLength(1); + expect(plugin1Messages[0].data).toBe('for plugin 1'); + + expect(plugin2Messages).toHaveLength(1); + expect(plugin2Messages[0].data).toBe('for plugin 2'); + + connectionA1.close(); + connectionB1.close(); + connectionA2.close(); + connectionB2.close(); + }); + }); + + describe('Reconnection', () => { + it('should handle leader reconnection (DevTools reload)', async () => { + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + const leaderConnection1 = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + await waitFor(() => leaderConnection1.isReady() && followerConnection.isReady()); + expect(followerConnection.isReady()).toBe(true); + + leaderConnection1.close(); + + const leaderConnection2 = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + await waitFor(() => leaderConnection2.isReady()); + + expect(leaderConnection2.isReady()).toBe(true); + expect(followerConnection.isReady()).toBe(true); + + const messages: WireMessage[] = []; + leaderConnection2.onMessage((msg) => messages.push(msg as WireMessage)); + + followerConnection.send(createWireMessage('test', 'test', 'after reconnect')); + + await wait(20); + expect(messages).toHaveLength(1); + + leaderConnection2.close(); + followerConnection.close(); + }); + }); + + describe('Manual start', () => { + it('should not start handshake when autoStart is false', async () => { + const leaderConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + autoStart: false, + }); + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + autoStart: false, + }); + + await wait(50); + + expect(leaderConnection.isReady()).toBe(false); + expect(followerConnection.isReady()).toBe(false); + + // Manually start + leaderConnection.signalReady(); + + await waitFor(() => leaderConnection.isReady() && followerConnection.isReady()); + + expect(leaderConnection.isReady()).toBe(true); + expect(followerConnection.isReady()).toBe(true); + + leaderConnection.close(); + followerConnection.close(); + }); + }); + }); + + describe('BufferedConnection', () => { + describe('Message Queuing', () => { + it('should queue outgoing messages until ready', async () => { + const handshakeConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + const bufferedConnection = createBufferedConnection(handshakeConnection); + + // Send before ready + bufferedConnection.send(createWireMessage('test', 'early', 1)); + bufferedConnection.send(createWireMessage('test', 'early', 2)); + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + const receivedMessages: WireMessage[] = []; + followerConnection.onMessage((msg) => receivedMessages.push(msg as WireMessage)); + + await waitFor(() => handshakeConnection.isReady() && followerConnection.isReady()); + await wait(20); + + expect(receivedMessages).toHaveLength(2); + expect(receivedMessages[0].data).toBe(1); + expect(receivedMessages[1].data).toBe(2); + + bufferedConnection.close(); + followerConnection.close(); + }); + + it('should respect maxQueueSize and drop oldest on overflow', async () => { + const handshakeConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + const bufferedConnection = createBufferedConnection(handshakeConnection, { + maxQueueSize: 3, + overflowStrategy: 'drop-oldest', + }); + + for (let i = 1; i <= 5; i++) { + bufferedConnection.send(createWireMessage('test', 'msg', i)); + } + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + const receivedMessages: WireMessage[] = []; + followerConnection.onMessage((msg) => receivedMessages.push(msg as WireMessage)); + + await waitFor(() => handshakeConnection.isReady()); + await wait(20); + + expect(receivedMessages).toHaveLength(3); + expect(receivedMessages[0].data).toBe(3); + expect(receivedMessages[1].data).toBe(4); + expect(receivedMessages[2].data).toBe(5); + + bufferedConnection.close(); + followerConnection.close(); + }); + + it('should drop newest on overflow when configured', async () => { + const handshakeConnection = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + + const bufferedConnection = createBufferedConnection(handshakeConnection, { + maxQueueSize: 3, + overflowStrategy: 'drop-newest', + }); + + for (let i = 1; i <= 5; i++) { + bufferedConnection.send(createWireMessage('test', 'msg', i)); + } + + const followerConnection = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + const receivedMessages: WireMessage[] = []; + followerConnection.onMessage((msg) => receivedMessages.push(msg as WireMessage)); + + await waitFor(() => handshakeConnection.isReady()); + await wait(20); + + expect(receivedMessages).toHaveLength(3); + expect(receivedMessages[0].data).toBe(1); + expect(receivedMessages[1].data).toBe(2); + expect(receivedMessages[2].data).toBe(3); + + bufferedConnection.close(); + followerConnection.close(); + }); + }); + + describe('Passthrough when ready', () => { + it('should send messages immediately when ready', async () => { + const leaderHandshake = createHandshakeConnection(channelA, { + pluginId: 'test', + isLeader: true, + }); + const leaderBuffered = createBufferedConnection(leaderHandshake); + + const followerHandshake = createHandshakeConnection(channelB, { + pluginId: 'test', + isLeader: false, + }); + + const receivedMessages: WireMessage[] = []; + followerHandshake.onMessage((msg) => receivedMessages.push(msg as WireMessage)); + + await waitFor(() => leaderBuffered.isReady()); + + leaderBuffered.send(createWireMessage('test', 'immediate', 'now')); + + await wait(20); + + expect(receivedMessages).toHaveLength(1); + expect(receivedMessages[0].data).toBe('now'); + + leaderBuffered.close(); + followerHandshake.close(); + }); + }); + }); +}); diff --git a/packages/plugin-bridge/src/v2/__tests__/typed-message-buffer.test.ts b/packages/plugin-bridge/src/v2/__tests__/typed-message-buffer.test.ts new file mode 100644 index 00000000..f33a15e3 --- /dev/null +++ b/packages/plugin-bridge/src/v2/__tests__/typed-message-buffer.test.ts @@ -0,0 +1,278 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { createTypedMessageBuffer } from '../client/typed-message-buffer.js'; +import { UserMessage } from '../connection/types.js'; + +const createMessage = (type: string, data: unknown): UserMessage => ({ + type, + data, + timestamp: Date.now(), +}); + +describe('TypedMessageBuffer', () => { + describe('Handler registration', () => { + it('should deliver messages immediately when handler exists', () => { + const buffer = createTypedMessageBuffer(); + const received: UserMessage[] = []; + + buffer.onMessage('test', (msg) => received.push(msg)); + buffer.handleMessage(createMessage('test', { value: 1 })); + + expect(received).toHaveLength(1); + expect(received[0].data).toEqual({ value: 1 }); + + buffer.close(); + }); + + it('should buffer messages when no handler exists', () => { + const buffer = createTypedMessageBuffer(); + + buffer.handleMessage(createMessage('test', { value: 1 })); + buffer.handleMessage(createMessage('test', { value: 2 })); + + expect(buffer.getBufferedCount('test')).toBe(2); + expect(buffer.getTotalBufferedCount()).toBe(2); + + buffer.close(); + }); + + it('should replay buffered messages when first handler is registered', async () => { + const buffer = createTypedMessageBuffer(); + const received: UserMessage[] = []; + + // Buffer some messages + buffer.handleMessage(createMessage('test', { value: 1 })); + buffer.handleMessage(createMessage('test', { value: 2 })); + buffer.handleMessage(createMessage('test', { value: 3 })); + + expect(buffer.getBufferedCount('test')).toBe(3); + + // Register handler - should trigger replay + buffer.onMessage('test', (msg) => received.push(msg)); + + // Wait for replay (happens on next tick) + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(received).toHaveLength(3); + expect(received[0].data).toEqual({ value: 1 }); + expect(received[1].data).toEqual({ value: 2 }); + expect(received[2].data).toEqual({ value: 3 }); + + // Buffer should be empty after replay + expect(buffer.getBufferedCount('test')).toBe(0); + + buffer.close(); + }); + + it('should not replay to subsequent handlers for the same type', async () => { + const buffer = createTypedMessageBuffer(); + const handler1Received: UserMessage[] = []; + const handler2Received: UserMessage[] = []; + + // Buffer message + buffer.handleMessage(createMessage('test', { value: 'buffered' })); + + // First handler gets replay + buffer.onMessage('test', (msg) => handler1Received.push(msg)); + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Second handler doesn't get replay + buffer.onMessage('test', (msg) => handler2Received.push(msg)); + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(handler1Received).toHaveLength(1); + expect(handler2Received).toHaveLength(0); + + // New message goes to both + buffer.handleMessage(createMessage('test', { value: 'new' })); + + expect(handler1Received).toHaveLength(2); + expect(handler2Received).toHaveLength(1); + + buffer.close(); + }); + + it('should buffer different types independently', async () => { + const buffer = createTypedMessageBuffer(); + const typeAReceived: UserMessage[] = []; + const typeBReceived: UserMessage[] = []; + + // Buffer messages of different types + buffer.handleMessage(createMessage('type-a', { a: 1 })); + buffer.handleMessage(createMessage('type-b', { b: 1 })); + buffer.handleMessage(createMessage('type-a', { a: 2 })); + + expect(buffer.getBufferedCount('type-a')).toBe(2); + expect(buffer.getBufferedCount('type-b')).toBe(1); + + // Register handler for type-a only + buffer.onMessage('type-a', (msg) => typeAReceived.push(msg)); + await new Promise((resolve) => setTimeout(resolve, 10)); + + // type-a replayed, type-b still buffered + expect(typeAReceived).toHaveLength(2); + expect(buffer.getBufferedCount('type-a')).toBe(0); + expect(buffer.getBufferedCount('type-b')).toBe(1); + + // Now register handler for type-b + buffer.onMessage('type-b', (msg) => typeBReceived.push(msg)); + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(typeBReceived).toHaveLength(1); + expect(buffer.getBufferedCount('type-b')).toBe(0); + + buffer.close(); + }); + }); + + describe('Buffer limits', () => { + it('should respect maxPerType limit', () => { + const buffer = createTypedMessageBuffer({ maxPerType: 3 }); + + for (let i = 0; i < 5; i++) { + buffer.handleMessage(createMessage('test', { index: i })); + } + + expect(buffer.getBufferedCount('test')).toBe(3); + + buffer.close(); + }); + + it('should respect maxTotal limit', () => { + const buffer = createTypedMessageBuffer({ maxTotal: 5 }); + + for (let i = 0; i < 3; i++) { + buffer.handleMessage(createMessage('type-a', { index: i })); + } + for (let i = 0; i < 5; i++) { + buffer.handleMessage(createMessage('type-b', { index: i })); + } + + expect(buffer.getTotalBufferedCount()).toBe(5); + + buffer.close(); + }); + + it('should drop old messages when buffer overflows', async () => { + const buffer = createTypedMessageBuffer({ maxPerType: 3 }); + const received: UserMessage[] = []; + + // Add 5 messages, only last 3 should be kept + for (let i = 0; i < 5; i++) { + buffer.handleMessage(createMessage('test', { index: i })); + } + + buffer.onMessage('test', (msg) => received.push(msg)); + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(received).toHaveLength(3); + expect((received[0].data as any).index).toBe(2); + expect((received[1].data as any).index).toBe(3); + expect((received[2].data as any).index).toBe(4); + + buffer.close(); + }); + + it('should drop messages older than maxAgeMs', async () => { + const buffer = createTypedMessageBuffer({ maxAgeMs: 50 }); + + buffer.handleMessage(createMessage('test', { value: 'old' })); + + // Wait for message to become stale + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Stale message should be cleaned up when we check + expect(buffer.getBufferedCount('test')).toBe(0); + + buffer.close(); + }); + }); + + describe('Handler removal', () => { + it('should stop delivering to removed handlers', () => { + const buffer = createTypedMessageBuffer(); + const received: UserMessage[] = []; + + const sub = buffer.onMessage('test', (msg) => received.push(msg)); + buffer.handleMessage(createMessage('test', { value: 1 })); + + expect(received).toHaveLength(1); + + sub.remove(); + + buffer.handleMessage(createMessage('test', { value: 2 })); + + expect(received).toHaveLength(1); + + buffer.close(); + }); + + it('should buffer again if all handlers are removed', () => { + const buffer = createTypedMessageBuffer(); + const received: UserMessage[] = []; + + const sub = buffer.onMessage('test', (msg) => received.push(msg)); + buffer.handleMessage(createMessage('test', { value: 1 })); + + sub.remove(); + + // New message should be buffered (no handlers) + buffer.handleMessage(createMessage('test', { value: 2 })); + + // Note: messages are only buffered until first handler is registered + // After that, they go to handlers or are dropped if no handlers + // This test verifies they're not delivered to removed handlers + expect(received).toHaveLength(1); + + buffer.close(); + }); + }); + + describe('Error handling', () => { + it('should catch handler errors and continue to other handlers', () => { + const buffer = createTypedMessageBuffer(); + const received: UserMessage[] = []; + const consoleError = vi.spyOn(console, 'error').mockImplementation(() => {}); + + buffer.onMessage('test', () => { + throw new Error('Handler error'); + }); + buffer.onMessage('test', (msg) => received.push(msg)); + + buffer.handleMessage(createMessage('test', { value: 1 })); + + expect(received).toHaveLength(1); + expect(consoleError).toHaveBeenCalled(); + + consoleError.mockRestore(); + buffer.close(); + }); + }); + + describe('Close', () => { + it('should clear all state on close', () => { + const buffer = createTypedMessageBuffer(); + + buffer.handleMessage(createMessage('test', { value: 1 })); + buffer.onMessage('test', () => {}); + + buffer.close(); + + expect(buffer.getTotalBufferedCount()).toBe(0); + + buffer.close(); + }); + + it('should not process messages after close', () => { + const buffer = createTypedMessageBuffer(); + const received: UserMessage[] = []; + + buffer.onMessage('test', (msg) => received.push(msg)); + buffer.close(); + + buffer.handleMessage(createMessage('test', { value: 1 })); + + expect(received).toHaveLength(0); + }); + }); +}); + diff --git a/packages/plugin-bridge/src/v2/client/client.ts b/packages/plugin-bridge/src/v2/client/client.ts new file mode 100644 index 00000000..8c1a4922 --- /dev/null +++ b/packages/plugin-bridge/src/v2/client/client.ts @@ -0,0 +1,133 @@ +import { Channel } from '../../channel/types.js'; +import { Subscription } from '../../types.js'; +import { + createHandshakeConnection, + createBufferedConnection, + Connection, + WireMessage, + UserMessage, + TypedBufferConfig, +} from '../connection/index.js'; +import { createTypedMessageBuffer, TypedMessageBuffer } from './typed-message-buffer.js'; +import { RozeniteDevToolsClient } from './types.js'; + +type ClientState = { + connection: Connection; + messageBuffer: TypedMessageBuffer; + connectionSubscription: Subscription | null; + closed: boolean; +}; + +type CreateClientOptions = { + channel: Channel; + pluginId: string; + isLeader: boolean; + bufferConfig?: TypedBufferConfig; +}; + +/** + * Internal client implementation. + * Composes: + * - HandshakeConnection (protocol) + * - BufferedConnection (outgoing queue during handshake) + * - TypedMessageBuffer (incoming per-type buffering with replay) + */ +export const createClientInternal = < + TEventMap extends Record = Record +>( + options: CreateClientOptions +): RozeniteDevToolsClient => { + const { channel, pluginId, isLeader, bufferConfig } = options; + + // Layer 1: Handshake connection (handles protocol) + const handshakeConnection = createHandshakeConnection(channel, { + pluginId, + isLeader, + autoStart: true, + }); + + // Layer 2: Buffered connection for outgoing messages during handshake + const bufferedConnection = createBufferedConnection(handshakeConnection, { + maxQueueSize: bufferConfig?.maxTotal ?? 1000, + overflowStrategy: 'drop-oldest', + }); + + // Layer 3: Typed message buffer for incoming messages + const messageBuffer = createTypedMessageBuffer(bufferConfig); + + const state: ClientState = { + connection: bufferedConnection, + messageBuffer, + connectionSubscription: null, + closed: false, + }; + + // Route incoming wire messages to typed buffer + const handleIncomingMessage = (wireMessage: unknown): void => { + if (state.closed) return; + + const wire = wireMessage as WireMessage; + + // Convert wire message to user message + const userMessage: UserMessage = { + type: wire.type, + data: wire.data, + timestamp: wire.timestamp, + }; + + // Let the typed buffer handle routing/buffering + state.messageBuffer.handleMessage(userMessage); + }; + + // Subscribe to connection messages + state.connectionSubscription = state.connection.onMessage(handleIncomingMessage); + + // Public API + + const send = ( + type: TType, + data: TEventMap[TType] + ): void => { + if (state.closed) { + console.warn('[Client] Attempted to send on closed client'); + return; + } + + // Send as wire message with timestamp + state.connection.send({ + type: type as string, + data, + timestamp: Date.now(), + }); + }; + + const onMessage = ( + type: TType, + handler: (message: UserMessage) => void + ): Subscription => { + return state.messageBuffer.onMessage(type as string, handler); + }; + + const onReady = (callback: () => void): Subscription => { + return state.connection.onReady(callback); + }; + + const isReady = (): boolean => { + return state.connection.isReady(); + }; + + const close = (): void => { + state.closed = true; + state.connectionSubscription?.remove(); + state.messageBuffer.close(); + state.connection.close(); + }; + + return { + send, + onMessage, + onReady, + isReady, + close, + }; +}; diff --git a/packages/plugin-bridge/src/v2/client/factory.ts b/packages/plugin-bridge/src/v2/client/factory.ts new file mode 100644 index 00000000..8c2669fc --- /dev/null +++ b/packages/plugin-bridge/src/v2/client/factory.ts @@ -0,0 +1,59 @@ +import { getChannel, isLeader as detectLeader } from '../../channel/factory.js'; +import { createClientInternal } from './client.js'; +import { RozeniteClientConfig, RozeniteDevToolsClient } from './types.js'; + +/** + * Creates a Rozenite DevTools client with the specified configuration. + * + * This is the main entry point for creating clients. + * It internally composes the connection layers (handshake + buffering) + * and returns a clean, typed client API. + * + * Messages are automatically buffered per-type and replayed when handlers + * are registered, ensuring no messages are lost. + * + * @example + * ```typescript + * type MyEvents = { + * 'user-action': { action: string }; + * 'state-update': { state: object }; + * }; + * + * const client = await createClient({ + * pluginId: 'my-plugin', + * }); + * + * // Messages are buffered until this handler is registered + * // Then all buffered 'state-update' messages are replayed + * client.onMessage('state-update', (message) => { + * console.log('State updated:', message.data, 'at', message.timestamp); + * }); + * + * client.send('user-action', { action: 'click' }); + * ``` + */ +export const createClient = async < + TEventMap extends Record = Record +>( + config: RozeniteClientConfig +): Promise> => { + const { + pluginId, + channel: providedChannel, + isLeader: providedIsLeader, + buffer, + } = config; + + // Use provided channel or get from factory + const channel = providedChannel ?? (await getChannel()); + + // Use provided isLeader or detect from environment + const leader = providedIsLeader ?? detectLeader(); + + return createClientInternal({ + channel, + pluginId, + isLeader: leader, + bufferConfig: buffer, + }); +}; diff --git a/packages/plugin-bridge/src/v2/client/index.ts b/packages/plugin-bridge/src/v2/client/index.ts new file mode 100644 index 00000000..a37b3088 --- /dev/null +++ b/packages/plugin-bridge/src/v2/client/index.ts @@ -0,0 +1,8 @@ +export { createClient } from './factory.js'; +export { createClientInternal } from './client.js'; +export { createTypedMessageBuffer } from './typed-message-buffer.js'; +export type { TypedMessageBuffer } from './typed-message-buffer.js'; +export type { + RozeniteDevToolsClient, + RozeniteClientConfig, +} from './types.js'; diff --git a/packages/plugin-bridge/src/v2/client/typed-message-buffer.ts b/packages/plugin-bridge/src/v2/client/typed-message-buffer.ts new file mode 100644 index 00000000..39ce45b0 --- /dev/null +++ b/packages/plugin-bridge/src/v2/client/typed-message-buffer.ts @@ -0,0 +1,242 @@ +import { Subscription } from '../../types.js'; +import { TypedBufferConfig, UserMessage } from '../connection/types.js'; + +type MessageHandler = (message: UserMessage) => void; + +type TypeState = { + handlers: Set; + hasReceivedFirstHandler: boolean; +}; + +type BufferedMessage = { + type: string; + message: UserMessage; + receivedAt: number; +}; + +type TypedMessageBufferState = { + typeStates: Map; + bufferedMessages: BufferedMessage[]; + config: Required; + closed: boolean; +}; + +const DEFAULT_CONFIG: Required = { + maxPerType: 100, + maxTotal: 1000, + maxAgeMs: 30_000, // 30 seconds +}; + +/** + * TypedMessageBuffer buffers messages by type and replays them when + * the first handler is registered for that type. + * + * Features: + * - Messages are buffered per-type until a handler is registered + * - When first handler for a type is added, all buffered messages of that type are replayed + * - Subsequent handlers for the same type only receive new messages + * - Configurable limits: max per type, max total, max age + * - Automatic cleanup of stale messages + */ +export type TypedMessageBuffer = { + /** + * Handle an incoming message. + * If handlers exist for this type, deliver immediately. + * Otherwise, buffer for later replay. + */ + handleMessage: (message: UserMessage) => void; + + /** + * Register a handler for a message type. + * If this is the first handler for this type, replay buffered messages. + */ + onMessage: ( + type: string, + handler: MessageHandler + ) => Subscription; + + /** + * Get the number of buffered messages for a type. + */ + getBufferedCount: (type: string) => number; + + /** + * Get the total number of buffered messages. + */ + getTotalBufferedCount: () => number; + + /** + * Clear all buffered messages and handlers. + */ + close: () => void; +}; + +/** + * Creates a typed message buffer for per-type buffering with replay semantics. + */ +export const createTypedMessageBuffer = ( + config: TypedBufferConfig = {} +): TypedMessageBuffer => { + const state: TypedMessageBufferState = { + typeStates: new Map(), + bufferedMessages: [], + config: { ...DEFAULT_CONFIG, ...config }, + closed: false, + }; + + const getOrCreateTypeState = (type: string): TypeState => { + let typeState = state.typeStates.get(type); + if (!typeState) { + typeState = { + handlers: new Set(), + hasReceivedFirstHandler: false, + }; + state.typeStates.set(type, typeState); + } + return typeState; + }; + + const cleanupStaleMessages = (): void => { + const now = Date.now(); + const maxAge = state.config.maxAgeMs; + + state.bufferedMessages = state.bufferedMessages.filter( + (msg) => now - msg.receivedAt < maxAge + ); + }; + + const enforceBufferLimits = (type: string): void => { + // Clean up stale messages first + cleanupStaleMessages(); + + // Enforce per-type limit + const typeMessages = state.bufferedMessages.filter((m) => m.type === type); + if (typeMessages.length > state.config.maxPerType) { + const toRemove = typeMessages.length - state.config.maxPerType; + let removed = 0; + state.bufferedMessages = state.bufferedMessages.filter((m) => { + if (m.type === type && removed < toRemove) { + removed++; + return false; + } + return true; + }); + } + + // Enforce total limit + if (state.bufferedMessages.length > state.config.maxTotal) { + const toRemove = state.bufferedMessages.length - state.config.maxTotal; + state.bufferedMessages = state.bufferedMessages.slice(toRemove); + } + }; + + const bufferMessage = (message: UserMessage): void => { + state.bufferedMessages.push({ + type: message.type, + message, + receivedAt: Date.now(), + }); + + enforceBufferLimits(message.type); + }; + + const deliverToHandlers = (typeState: TypeState, message: UserMessage): void => { + typeState.handlers.forEach((handler) => { + try { + handler(message); + } catch (error) { + console.error(`[TypedMessageBuffer] Handler error for type "${message.type}":`, error); + } + }); + }; + + const replayBufferedMessages = (type: string, typeState: TypeState): void => { + // Clean up stale messages before replay + cleanupStaleMessages(); + + // Get messages for this type in order + const messagesToReplay = state.bufferedMessages + .filter((m) => m.type === type) + .map((m) => m.message); + + // Remove replayed messages from buffer + state.bufferedMessages = state.bufferedMessages.filter((m) => m.type !== type); + + // Deliver to handlers + messagesToReplay.forEach((message) => { + deliverToHandlers(typeState, message); + }); + }; + + // Public API + + const handleMessage = (message: UserMessage): void => { + if (state.closed) return; + + const typeState = state.typeStates.get(message.type); + + if (typeState && typeState.handlers.size > 0) { + // Handlers exist - deliver immediately + deliverToHandlers(typeState, message); + } else { + // No handlers - buffer for later + bufferMessage(message); + } + }; + + const onMessage = ( + type: string, + handler: MessageHandler + ): Subscription => { + if (state.closed) { + return { remove: () => {} }; + } + + const typeState = getOrCreateTypeState(type); + const isFirstHandler = !typeState.hasReceivedFirstHandler; + + typeState.handlers.add(handler as MessageHandler); + + // If this is the first handler for this type, replay buffered messages + if (isFirstHandler) { + typeState.hasReceivedFirstHandler = true; + // Replay on next tick to allow caller to complete setup + setTimeout(() => { + if (!state.closed) { + replayBufferedMessages(type, typeState); + } + }, 0); + } + + return { + remove: () => { + typeState.handlers.delete(handler as MessageHandler); + }, + }; + }; + + const getBufferedCount = (type: string): number => { + cleanupStaleMessages(); + return state.bufferedMessages.filter((m) => m.type === type).length; + }; + + const getTotalBufferedCount = (): number => { + cleanupStaleMessages(); + return state.bufferedMessages.length; + }; + + const close = (): void => { + state.closed = true; + state.typeStates.clear(); + state.bufferedMessages = []; + }; + + return { + handleMessage, + onMessage, + getBufferedCount, + getTotalBufferedCount, + close, + }; +}; + diff --git a/packages/plugin-bridge/src/v2/client/types.ts b/packages/plugin-bridge/src/v2/client/types.ts new file mode 100644 index 00000000..b2dfcd8a --- /dev/null +++ b/packages/plugin-bridge/src/v2/client/types.ts @@ -0,0 +1,78 @@ +import { Subscription } from '../../types.js'; +import { Channel } from '../../channel/types.js'; +import { TypedBufferConfig, UserMessage } from '../connection/types.js'; + +/** + * Rozenite DevTools client. + * + * Provides typed bidirectional communication between device and DevTools panel. + * Messages are automatically buffered per-type and replayed when handlers are registered. + */ +export type RozeniteDevToolsClient< + TEventMap extends Record = Record +> = { + /** + * Send a typed message to the other side. + * Messages are timestamped automatically. + */ + send: ( + type: TType, + data: TEventMap[TType] + ) => void; + + /** + * Register a handler for a specific message type. + * + * When the first handler for a type is registered, any buffered messages + * of that type are replayed to the handler. + * + * Subsequent handlers for the same type only receive new messages. + */ + onMessage: ( + type: TType, + handler: (message: UserMessage) => void + ) => Subscription; + + /** + * Register a callback to be called when the connection is ready. + * If already ready, the callback is called on the next tick. + */ + onReady: (callback: () => void) => Subscription; + + /** + * Check if the connection is ready for communication. + * Messages can be sent before ready - they will be queued. + */ + isReady: () => boolean; + + /** + * Close the client and release all resources. + */ + close: () => void; +}; + +/** + * Configuration for creating a client. + */ +export type RozeniteClientConfig = { + /** + * Unique identifier for the plugin. + */ + pluginId: string; + + /** + * Optional: provide a custom channel for testing. + */ + channel?: Channel; + + /** + * Optional: specify leader/follower role. + * If not provided, detected from environment. + */ + isLeader?: boolean; + + /** + * Optional: configuration for message buffering. + */ + buffer?: TypedBufferConfig; +}; diff --git a/packages/plugin-bridge/src/v2/connection/buffered-connection.ts b/packages/plugin-bridge/src/v2/connection/buffered-connection.ts new file mode 100644 index 00000000..07ec87e9 --- /dev/null +++ b/packages/plugin-bridge/src/v2/connection/buffered-connection.ts @@ -0,0 +1,166 @@ +import { Subscription } from '../../types.js'; +import { Connection, BufferedConnectionConfig } from './types.js'; + +type MessageListener = (message: unknown) => void; + +type BufferedConnectionState = { + outgoingQueue: unknown[]; + incomingQueue: unknown[]; + messageListeners: Set; + connectionSubscription: Subscription | null; + readySubscription: Subscription | null; + closed: boolean; +}; + +const DEFAULT_MAX_QUEUE_SIZE = 1000; +const DEFAULT_OVERFLOW_STRATEGY = 'drop-oldest'; + +/** + * Creates a buffered connection that queues messages until the underlying connection is ready. + * + * This is a generic facade/decorator that works with ANY Connection. + * It doesn't know about handshakes - it just uses the connection's isReady() state. + * + * Features: + * - Queues outgoing messages until connection is ready + * - Queues incoming messages until connection is ready + * - Flushes queues when connection becomes ready + * - Configurable queue size limits and overflow strategies + * - Handles reconnection (queues again if connection becomes not-ready) + */ +export const createBufferedConnection = ( + connection: Connection, + config: BufferedConnectionConfig = {} +): Connection => { + const maxQueueSize = config.maxQueueSize ?? DEFAULT_MAX_QUEUE_SIZE; + const overflowStrategy = config.overflowStrategy ?? DEFAULT_OVERFLOW_STRATEGY; + + const state: BufferedConnectionState = { + outgoingQueue: [], + incomingQueue: [], + messageListeners: new Set(), + connectionSubscription: null, + readySubscription: null, + closed: false, + }; + + const handleQueueOverflow = (queue: T[], item: T, queueName: string): void => { + if (queue.length >= maxQueueSize) { + switch (overflowStrategy) { + case 'drop-oldest': + queue.shift(); + break; + case 'drop-newest': + return; // Don't add the new item + case 'throw': + throw new Error(`[BufferedConnection] ${queueName} queue overflow (max: ${maxQueueSize})`); + } + } + queue.push(item); + }; + + const notifyListeners = (message: unknown): void => { + state.messageListeners.forEach((listener) => { + try { + listener(message); + } catch (error) { + console.error('[BufferedConnection] Listener error:', error); + } + }); + }; + + const flushQueues = (): void => { + // Flush outgoing queue + while (state.outgoingQueue.length > 0) { + const message = state.outgoingQueue.shift()!; + connection.send(message); + } + + // Flush incoming queue + while (state.incomingQueue.length > 0) { + const message = state.incomingQueue.shift()!; + notifyListeners(message); + } + }; + + const handleIncomingMessage = (message: unknown): void => { + if (state.closed) return; + + if (connection.isReady()) { + // Connection ready - deliver immediately + notifyListeners(message); + } else { + // Queue until ready + handleQueueOverflow(state.incomingQueue, message, 'incoming'); + } + }; + + // Public API + + const send = (message: unknown): void => { + if (state.closed) { + console.warn('[BufferedConnection] Attempted to send on closed connection'); + return; + } + + if (connection.isReady()) { + // Connection ready - send immediately + connection.send(message); + } else { + // Queue until ready + handleQueueOverflow(state.outgoingQueue, message, 'outgoing'); + } + }; + + const onMessage = (listener: MessageListener): Subscription => { + state.messageListeners.add(listener); + + return { + remove: () => { + state.messageListeners.delete(listener); + }, + }; + }; + + const isReady = (): boolean => { + return connection.isReady(); + }; + + const onReady = (callback: () => void): Subscription => { + return connection.onReady(callback); + }; + + const close = (): void => { + state.closed = true; + state.connectionSubscription?.remove(); + state.readySubscription?.remove(); + state.messageListeners.clear(); + state.outgoingQueue = []; + state.incomingQueue = []; + connection.close(); + }; + + // Initialize: subscribe to underlying connection + state.connectionSubscription = connection.onMessage(handleIncomingMessage); + + // Subscribe to ready events to flush queues + state.readySubscription = connection.onReady(() => { + if (!state.closed) { + flushQueues(); + } + }); + + // If already ready, flush immediately + if (connection.isReady()) { + flushQueues(); + } + + return { + send, + onMessage, + isReady, + onReady, + close, + }; +}; + diff --git a/packages/plugin-bridge/src/v2/connection/handshake-connection.ts b/packages/plugin-bridge/src/v2/connection/handshake-connection.ts new file mode 100644 index 00000000..bc340f96 --- /dev/null +++ b/packages/plugin-bridge/src/v2/connection/handshake-connection.ts @@ -0,0 +1,236 @@ +import { Channel } from '../../channel/types.js'; +import { Subscription } from '../../types.js'; +import { + Connection, + HandshakeConnectionConfig, + HandshakeState, + HandshakeStateType, + HANDSHAKE_INIT, + HANDSHAKE_ACK, + HANDSHAKE_COMPLETE, + isHandshakeMessage, + isWireMessage, + WireMessage, +} from './types.js'; + +type MessageListener = (message: WireMessage) => void; + +type HandshakeConnectionState = { + handshakeState: HandshakeStateType; + channelSubscription: Subscription | null; + messageListeners: Set; + readyListeners: Set<() => void>; + closed: boolean; +}; + +/** + * Creates a connection that implements the handshake protocol. + * + * Protocol: + * 1. Leader sends INIT + * 2. Follower responds with ACK + * 3. Leader sends COMPLETE + * 4. Both sides are now ready + * + * This layer ONLY handles the handshake protocol and message routing. + * It does NOT buffer messages - that's the responsibility of BufferedConnection. + * + * Messages sent before ready will be silently dropped (use BufferedConnection to queue them). + */ +type HandshakeConnection = Connection & { + signalReady: () => void; +}; + +export const createHandshakeConnection = ( + channel: Channel, + config: HandshakeConnectionConfig +): HandshakeConnection => { + const { pluginId, isLeader, autoStart = true } = config; + + const state: HandshakeConnectionState = { + handshakeState: HandshakeState.NOT_STARTED, + channelSubscription: null, + messageListeners: new Set(), + readyListeners: new Set(), + closed: false, + }; + + const sendHandshakeMessage = (type: typeof HANDSHAKE_INIT | typeof HANDSHAKE_ACK | typeof HANDSHAKE_COMPLETE): void => { + channel.send({ + pluginId, + type, + payload: null, + }); + }; + + const notifyReady = (): void => { + state.readyListeners.forEach((callback) => { + // Use setTimeout to ensure callbacks don't block + setTimeout(callback, 0); + }); + }; + + const handleHandshakeMessage = (message: { type: string; pluginId: string }): void => { + switch (message.type) { + case HANDSHAKE_INIT: + if (!isLeader) { + // Follower receives INIT + // Always respond to INIT, even if already ready (handles leader reload) + if (state.handshakeState === HandshakeState.READY) { + // Leader reconnected - reset our ready state + state.handshakeState = HandshakeState.WAITING_FOR_COMPLETE; + } else { + state.handshakeState = HandshakeState.WAITING_FOR_COMPLETE; + } + sendHandshakeMessage(HANDSHAKE_ACK); + } + break; + + case HANDSHAKE_ACK: + if (isLeader) { + if (state.handshakeState === HandshakeState.WAITING_FOR_ACK) { + // Normal flow: we sent INIT, received ACK, send COMPLETE + sendHandshakeMessage(HANDSHAKE_COMPLETE); + state.handshakeState = HandshakeState.READY; + notifyReady(); + } else if (state.handshakeState === HandshakeState.READY) { + // Follower reconnected - just send COMPLETE again + sendHandshakeMessage(HANDSHAKE_COMPLETE); + } + } + break; + + case HANDSHAKE_COMPLETE: + if (!isLeader) { + if (state.handshakeState === HandshakeState.WAITING_FOR_COMPLETE) { + state.handshakeState = HandshakeState.READY; + notifyReady(); + } + } + break; + } + }; + + const handleIncomingMessage = (rawMessage: unknown): void => { + if (state.closed) return; + + // Parse and validate message structure + if (typeof rawMessage !== 'object' || rawMessage === null) { + return; + } + + const msg = rawMessage as Record; + if (msg.pluginId !== pluginId) { + return; // Not for us + } + + if (isHandshakeMessage(rawMessage)) { + handleHandshakeMessage(rawMessage); + } else if (isWireMessage(rawMessage)) { + // Forward user messages to listeners + // Note: We forward even if not ready - it's up to the caller to handle this + state.messageListeners.forEach((listener) => { + try { + listener(rawMessage); + } catch (error) { + console.error('[HandshakeConnection] Listener error:', error); + } + }); + } + }; + + // Public API + + const send = (message: unknown): void => { + if (state.closed) { + console.warn('[HandshakeConnection] Attempted to send on closed connection'); + return; + } + + // Note: We allow sending even if not ready. + // BufferedConnection will queue messages; raw usage drops them. + channel.send({ + pluginId, + ...(message as object), + }); + }; + + const onMessage = (listener: MessageListener): Subscription => { + state.messageListeners.add(listener); + + return { + remove: () => { + state.messageListeners.delete(listener); + }, + }; + }; + + const isReady = (): boolean => { + return state.handshakeState === HandshakeState.READY; + }; + + const onReady = (callback: () => void): Subscription => { + if (state.handshakeState === HandshakeState.READY) { + // Already ready, call on next tick + setTimeout(callback, 0); + } + + state.readyListeners.add(callback); + + return { + remove: () => { + state.readyListeners.delete(callback); + }, + }; + }; + + const close = (): void => { + state.closed = true; + state.channelSubscription?.remove(); + state.messageListeners.clear(); + state.readyListeners.clear(); + state.handshakeState = HandshakeState.NOT_STARTED; + }; + + /** + * Start the handshake process. + * Leader sends INIT; follower just waits for INIT. + */ + const startHandshake = (): void => { + if (state.closed) return; + + if (state.handshakeState !== HandshakeState.NOT_STARTED && + state.handshakeState !== HandshakeState.READY) { + // Already in progress + return; + } + + if (isLeader) { + state.handshakeState = HandshakeState.WAITING_FOR_ACK; + sendHandshakeMessage(HANDSHAKE_INIT); + } + // Follower doesn't do anything - just waits for INIT + }; + + // Initialize: subscribe to channel + state.channelSubscription = channel.onMessage(handleIncomingMessage); + + // Auto-start handshake on next tick if configured + if (autoStart) { + setTimeout(() => { + if (!state.closed) { + startHandshake(); + } + }, 0); + } + + return { + send, + onMessage, + isReady, + onReady, + close, + signalReady: startHandshake, + }; +}; + diff --git a/packages/plugin-bridge/src/v2/connection/index.ts b/packages/plugin-bridge/src/v2/connection/index.ts new file mode 100644 index 00000000..ac2a0cc7 --- /dev/null +++ b/packages/plugin-bridge/src/v2/connection/index.ts @@ -0,0 +1,21 @@ +export { createHandshakeConnection } from './handshake-connection.js'; +export { createBufferedConnection } from './buffered-connection.js'; +export type { + Connection, + HandshakeConnectionConfig, + BufferedConnectionConfig, + TypedBufferConfig, + UserMessage, + WireMessage, + HandshakeMessage, + HandshakeMessageType, + HandshakeStateType, +} from './types.js'; +export { + HANDSHAKE_INIT, + HANDSHAKE_ACK, + HANDSHAKE_COMPLETE, + HandshakeState, + isHandshakeMessage, + isWireMessage, +} from './types.js'; diff --git a/packages/plugin-bridge/src/v2/connection/types.ts b/packages/plugin-bridge/src/v2/connection/types.ts new file mode 100644 index 00000000..1db23b53 --- /dev/null +++ b/packages/plugin-bridge/src/v2/connection/types.ts @@ -0,0 +1,189 @@ +import { Subscription } from '../../types.js'; +import { Channel } from '../../channel/types.js'; + +/** + * Connection extends Channel with ready-state semantics. + * A connection knows when it's ready to reliably send/receive messages. + */ +export type Connection = Channel & { + /** + * Returns true if the connection is ready for communication. + */ + isReady: () => boolean; + + /** + * Registers a callback to be called when the connection becomes ready. + * If already ready, the callback is called on the next tick. + * Returns a subscription that can be used to unsubscribe. + */ + onReady: (callback: () => void) => Subscription; + + /** + * Signal that this side is ready to start the handshake. + * Only needed if autoStart is false. + */ + signalReady?: () => void; +}; + +/** + * Configuration for handshake connection + */ +export type HandshakeConnectionConfig = { + /** + * Unique identifier for the plugin using this connection. + * Messages are filtered by pluginId. + */ + pluginId: string; + + /** + * Whether this side is the leader (initiates handshake). + * Typically the DevTools panel is the leader. + */ + isLeader: boolean; + + /** + * Whether to start the handshake automatically. + * If false, you must call signalReady() to start. + * Defaults to true. + */ + autoStart?: boolean; + + /** + * Optional timeout for handshake in milliseconds. + * If not provided, no timeout is applied. + */ + handshakeTimeoutMs?: number; +}; + +/** + * Configuration for buffered connection + */ +export type BufferedConnectionConfig = { + /** + * Maximum number of messages to queue before dropping. + * Defaults to 1000. + */ + maxQueueSize?: number; + + /** + * What to do when queue is full. + * - 'drop-oldest': Remove oldest message and add new one + * - 'drop-newest': Ignore new message + * - 'throw': Throw an error + * Defaults to 'drop-oldest'. + */ + overflowStrategy?: 'drop-oldest' | 'drop-newest' | 'throw'; +}; + +/** + * Handshake protocol message types + */ +export const HANDSHAKE_INIT = '__HANDSHAKE_INIT__' as const; +export const HANDSHAKE_ACK = '__HANDSHAKE_ACK__' as const; +export const HANDSHAKE_COMPLETE = '__HANDSHAKE_COMPLETE__' as const; + +export type HandshakeMessageType = + | typeof HANDSHAKE_INIT + | typeof HANDSHAKE_ACK + | typeof HANDSHAKE_COMPLETE; + +export type HandshakeMessage = { + type: HandshakeMessageType; + pluginId: string; + payload: null; +}; + +/** + * Handshake state machine states + */ +export const HandshakeState = { + NOT_STARTED: 'not_started', + WAITING_FOR_ACK: 'waiting_for_ack', + WAITING_FOR_COMPLETE: 'waiting_for_complete', + READY: 'ready', +} as const; + +export type HandshakeStateType = (typeof HandshakeState)[keyof typeof HandshakeState]; + +/** + * Type guard for handshake messages + */ +export const isHandshakeMessage = (message: unknown): message is HandshakeMessage => { + if (typeof message !== 'object' || message === null) { + return false; + } + + const msg = message as Record; + if (typeof msg.type !== 'string' || typeof msg.pluginId !== 'string') { + return false; + } + + return ( + msg.type === HANDSHAKE_INIT || + msg.type === HANDSHAKE_ACK || + msg.type === HANDSHAKE_COMPLETE + ); +}; + +/** + * Wire format for user messages (v2). + * This is the structure sent over the channel. + */ +export type WireMessage = { + pluginId: string; + type: string; + timestamp: number; + data: unknown; +}; + +/** + * User message as delivered to handlers. + * Contains the message type, data, and metadata. + */ +export type UserMessage = { + type: string; + data: T; + timestamp: number; +}; + +/** + * Type guard for wire messages (v2 format) + */ +export const isWireMessage = (message: unknown): message is WireMessage => { + if (typeof message !== 'object' || message === null) { + return false; + } + + const msg = message as Record; + return ( + typeof msg.pluginId === 'string' && + typeof msg.type === 'string' && + typeof msg.timestamp === 'number' && + 'data' in msg && + !isHandshakeMessage(message) + ); +}; + +/** + * Configuration for typed message buffer + */ +export type TypedBufferConfig = { + /** + * Maximum messages to buffer per type. + * Defaults to 100. + */ + maxPerType?: number; + + /** + * Maximum total messages across all types. + * Defaults to 1000. + */ + maxTotal?: number; + + /** + * Maximum age of buffered messages in milliseconds. + * Messages older than this are dropped. + * Defaults to 30000 (30 seconds). + */ + maxAgeMs?: number; +}; diff --git a/packages/plugin-bridge/src/v2/index.ts b/packages/plugin-bridge/src/v2/index.ts new file mode 100644 index 00000000..b898b4db --- /dev/null +++ b/packages/plugin-bridge/src/v2/index.ts @@ -0,0 +1,36 @@ +// Client API +export { + useRozeniteDevToolsClient, + useRozeniteDevToolsClientInternal, +} from './useRozeniteDevToolsClient.js'; +export type { + UseRozeniteDevToolsClientOptions, + UseRozeniteDevToolsClientInternalOptions, +} from './useRozeniteDevToolsClient.js'; + +export { createClient } from './client/index.js'; +export type { + RozeniteDevToolsClient, + RozeniteClientConfig, +} from './client/types.js'; + +// Message types +export type { + UserMessage, + TypedBufferConfig, +} from './connection/types.js'; + +// Connection API (for advanced usage) +export { + createHandshakeConnection, + createBufferedConnection, +} from './connection/index.js'; +export type { + Connection, + HandshakeConnectionConfig, + BufferedConnectionConfig, +} from './connection/types.js'; + +// Typed buffer (for advanced usage) +export { createTypedMessageBuffer } from './client/index.js'; +export type { TypedMessageBuffer } from './client/index.js'; diff --git a/packages/plugin-bridge/src/v2/test-utils/index.ts b/packages/plugin-bridge/src/v2/test-utils/index.ts new file mode 100644 index 00000000..08e3b047 --- /dev/null +++ b/packages/plugin-bridge/src/v2/test-utils/index.ts @@ -0,0 +1,8 @@ +export type { MockChannel } from './mock-channel.js'; +export { + createMockChannel, + createMockChannelPair, + wait, + waitFor, + waitForBothReady, +} from './mock-channel.js'; diff --git a/packages/plugin-bridge/src/v2/test-utils/mock-channel.ts b/packages/plugin-bridge/src/v2/test-utils/mock-channel.ts new file mode 100644 index 00000000..990523e6 --- /dev/null +++ b/packages/plugin-bridge/src/v2/test-utils/mock-channel.ts @@ -0,0 +1,221 @@ +import { Channel } from '../../channel/types.js'; +import { Subscription } from '../../types.js'; + +type MessageListener = (message: unknown) => void; + +type MockChannelState = { + listeners: Set; + peerReceive: ((message: unknown) => void) | null; + messageLog: unknown[]; + closed: boolean; +}; + +/** + * Mock channel implementation for testing. + * Can be connected to another MockChannel to simulate bidirectional communication. + */ +export type MockChannel = Channel & { + /** + * Connect this channel to another channel for bidirectional communication. + */ + connect: (otherChannel: MockChannel) => void; + + /** + * Check if there are any listeners registered. + */ + hasListeners: () => boolean; + + /** + * Get the number of registered listeners. + */ + getListenerCount: () => number; + + /** + * Get all messages that have been sent through this channel. + */ + getMessageLog: () => unknown[]; + + /** + * Clear the message log. + */ + clearMessageLog: () => void; + + /** + * Internal method for receiving messages from peer. + */ + receive: (message: unknown) => void; + + /** + * Internal state access for connection. + */ + state: MockChannelState; +}; + +/** + * Create a mock channel for testing. + * Messages are delivered asynchronously (via setTimeout) to simulate real conditions. + */ +export const createMockChannel = (): MockChannel => { + const state: MockChannelState = { + listeners: new Set(), + peerReceive: null, + messageLog: [], + closed: false, + }; + + const connect = (otherChannel: MockChannel): void => { + state.peerReceive = otherChannel.receive; + otherChannel.state.peerReceive = receive; + }; + + const send = (message: unknown): void => { + if (state.closed) { + console.warn('[MockChannel] Attempted to send on closed channel'); + return; + } + + state.messageLog.push(message); + + if (state.peerReceive) { + const peerReceive = state.peerReceive; + // Simulate async message delivery + setTimeout(() => { + peerReceive(message); + }, 0); + } + }; + + const receive = (message: unknown): void => { + if (state.closed) return; + + state.listeners.forEach((listener) => { + try { + listener(message); + } catch (error) { + console.error('[MockChannel] Listener error:', error); + } + }); + }; + + const onMessage = (listener: MessageListener): Subscription => { + state.listeners.add(listener); + + return { + remove: () => { + state.listeners.delete(listener); + }, + }; + }; + + const close = (): void => { + state.closed = true; + state.listeners.clear(); + state.peerReceive = null; + }; + + const hasListeners = (): boolean => { + return state.listeners.size > 0; + }; + + const getListenerCount = (): number => { + return state.listeners.size; + }; + + const getMessageLog = (): unknown[] => { + return [...state.messageLog]; + }; + + const clearMessageLog = (): void => { + state.messageLog = []; + }; + + return { + send, + onMessage, + close, + connect, + hasListeners, + getListenerCount, + getMessageLog, + clearMessageLog, + receive, + state, + }; +}; + +/** + * Create a pair of connected mock channels for testing bidirectional communication. + * + * @returns [channelA, channelB] - Two connected channels + */ +export const createMockChannelPair = (): [MockChannel, MockChannel] => { + const channelA = createMockChannel(); + const channelB = createMockChannel(); + + // Set up bidirectional communication + channelA.state.peerReceive = channelB.receive; + channelB.state.peerReceive = channelA.receive; + + return [channelA, channelB]; +}; + +/** + * Wait for a specified number of milliseconds. + * Useful for waiting for async message delivery in tests. + */ +export const wait = (ms: number): Promise => { + return new Promise((resolve) => setTimeout(resolve, ms)); +}; + +/** + * Wait for a condition to become true. + * + * @param condition - Function that returns true when condition is met + * @param timeout - Maximum time to wait in milliseconds (default: 1000) + * @param interval - Check interval in milliseconds (default: 10) + */ +export const waitFor = async ( + condition: () => boolean, + timeout = 1000, + interval = 10 +): Promise => { + const start = Date.now(); + + while (!condition()) { + if (Date.now() - start > timeout) { + throw new Error(`waitFor timeout after ${timeout}ms`); + } + await wait(interval); + } +}; + +/** + * Wait for both clients to be ready. + */ +export const waitForBothReady = async ( + clientA: { isReady: () => boolean; onReady: (cb: () => void) => { remove: () => void } }, + clientB: { isReady: () => boolean; onReady: (cb: () => void) => { remove: () => void } } +): Promise => { + await Promise.all([ + new Promise((resolve) => { + if (clientA.isReady()) { + resolve(); + } else { + const sub = clientA.onReady(() => { + sub.remove(); + resolve(); + }); + } + }), + new Promise((resolve) => { + if (clientB.isReady()) { + resolve(); + } else { + const sub = clientB.onReady(() => { + sub.remove(); + resolve(); + }); + } + }), + ]); +}; diff --git a/packages/plugin-bridge/src/v2/useRozeniteDevToolsClient.ts b/packages/plugin-bridge/src/v2/useRozeniteDevToolsClient.ts new file mode 100644 index 00000000..6eddc7c4 --- /dev/null +++ b/packages/plugin-bridge/src/v2/useRozeniteDevToolsClient.ts @@ -0,0 +1,167 @@ +import { useEffect, useState } from 'react'; +import { createClient } from './client/index.js'; +import { RozeniteDevToolsClient, RozeniteClientConfig } from './client/types.js'; +import { TypedBufferConfig } from './connection/types.js'; +import { UnsupportedPlatformError } from '../errors.js'; + +/** + * Options for the useRozeniteDevToolsClient hook. + */ +export type UseRozeniteDevToolsClientOptions = { + /** + * Unique identifier for the plugin. + */ + pluginId: string; + + /** + * Optional: configuration for message buffering. + */ + buffer?: TypedBufferConfig; +}; + +/** + * Internal API - includes transport layer options for testing. + */ +export type UseRozeniteDevToolsClientInternalOptions = UseRozeniteDevToolsClientOptions & + Pick; + +/** + * Internal hook - accepts transport layer options for testing. + * + * @internal + */ +export const useRozeniteDevToolsClientInternal = < + TEventMap extends Record = Record +>({ + pluginId, + buffer, + channel, + isLeader, +}: UseRozeniteDevToolsClientInternalOptions): RozeniteDevToolsClient | null => { + const [client, setClient] = useState | null>(null); + const [error, setError] = useState(null); + + useEffect(() => { + let isMounted = true; + let clientInstance: RozeniteDevToolsClient | null = null; + + const setup = async () => { + try { + clientInstance = await createClient({ + pluginId, + buffer, + channel, + isLeader, + }); + + if (isMounted) { + // Wait for handshake to complete before exposing client + const readyPromise = new Promise((resolve) => { + if (clientInstance && clientInstance.isReady()) { + resolve(); + } else if (clientInstance) { + const subscription = clientInstance.onReady(() => { + subscription.remove(); + resolve(); + }); + } + }); + + await readyPromise; + + if (isMounted) { + setClient(clientInstance); + } + } + } catch (err) { + if (err instanceof UnsupportedPlatformError) { + // Expected on unsupported platforms - don't show error + console.warn( + `[Rozenite, ${pluginId}] Unsupported platform, skipping setup.` + ); + return; + } + + console.error('[Rozenite] Error setting up client:', err); + + if (isMounted) { + setError(err); + } + } + }; + + const teardown = () => { + try { + if (clientInstance != null) { + clientInstance.close(); + } + } catch { + // Ignore errors during teardown + } + }; + + setup(); + + return () => { + isMounted = false; + teardown(); + }; + }, [pluginId, buffer, channel, isLeader]); + + if (error != null) { + throw error; + } + + return client; +}; + +/** + * React hook to create and manage a Rozenite DevTools client. + * + * Returns null until the connection is ready. + * + * Messages are automatically buffered per-type and replayed when handlers + * are registered, ensuring no messages are lost regardless of registration timing. + * + * @example + * ```tsx + * type MyEvents = { + * 'user-action': { action: string }; + * 'state-update': { state: object }; + * }; + * + * const MyComponent = () => { + * const client = useRozeniteDevToolsClient({ + * pluginId: 'my-plugin', + * }); + * + * useEffect(() => { + * if (!client) return; + * + * // Handler receives all buffered 'state-update' messages + future ones + * const sub = client.onMessage('state-update', (message) => { + * console.log('State:', message.data, 'sent at:', message.timestamp); + * }); + * + * return () => sub.remove(); + * }, [client]); + * + * const handleClick = () => { + * client?.send('user-action', { action: 'click' }); + * }; + * + * return ( + * + * ); + * }; + * ``` + */ +export const useRozeniteDevToolsClient = < + TEventMap extends Record = Record +>( + options: UseRozeniteDevToolsClientOptions +): RozeniteDevToolsClient | null => { + return useRozeniteDevToolsClientInternal(options); +};