diff --git a/.changeset/empty-carrots-stare.md b/.changeset/empty-carrots-stare.md new file mode 100644 index 000000000..7aa423d4f --- /dev/null +++ b/.changeset/empty-carrots-stare.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Implemented FallbackAdapter for LLM diff --git a/agents/src/llm/fallback_adapter.test.ts b/agents/src/llm/fallback_adapter.test.ts new file mode 100644 index 000000000..84f3a9141 --- /dev/null +++ b/agents/src/llm/fallback_adapter.test.ts @@ -0,0 +1,238 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { beforeAll, describe, expect, it, vi } from 'vitest'; +import { APIConnectionError, APIError } from '../_exceptions.js'; +import { initializeLogger } from '../log.js'; +import type { APIConnectOptions } from '../types.js'; +import { delay } from '../utils.js'; +import type { ChatContext } from './chat_context.js'; +import { FallbackAdapter } from './fallback_adapter.js'; +import { type ChatChunk, LLM, LLMStream } from './llm.js'; +import type { ToolChoice, ToolContext } from './tool_context.js'; + +class MockLLMStream extends LLMStream { + public myLLM: LLM; + + constructor( + llm: LLM, + opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions: APIConnectOptions; + }, + private shouldFail: boolean = false, + private failAfterChunks: number = 0, + ) { + super(llm, opts); + this.myLLM = llm; + } + + protected async run(): Promise { + if (this.shouldFail && this.failAfterChunks === 0) { + throw new APIError('Mock LLM failed immediately'); + } + + const chunk: ChatChunk = { + id: 'test-id', + delta: { role: 'assistant', content: 'chunk' }, + }; + + for (let i = 0; i < 3; i++) { + if (this.shouldFail && i === this.failAfterChunks) { + throw new APIError('Mock LLM failed after chunks'); + } + this.queue.put(chunk); + await delay(10); + } + } +} + +class MockLLM extends LLM { + shouldFail: boolean = false; + failAfterChunks: number = 0; + private _label: string; + + constructor(label: string) { + super(); + this._label = label; + } + + label(): string { + return this._label; + } + + chat(opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions?: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: ToolChoice; + extraKwargs?: Record; + }): LLMStream { + return new MockLLMStream( + this, + { + chatCtx: opts.chatCtx, + toolCtx: opts.toolCtx, + connOptions: opts.connOptions!, + }, + this.shouldFail, + this.failAfterChunks, + ); + } +} + +describe('FallbackAdapter', () => { + beforeAll(() => { + initializeLogger({ pretty: false }); + // Suppress unhandled rejections from LLMStream background tasks + process.on('unhandledRejection', () => {}); + }); + + it('should initialize correctly', () => { + const llm1 = new MockLLM('llm1'); + const adapter = new FallbackAdapter({ llms: [llm1] }); + expect(adapter.llms).toHaveLength(1); + expect(adapter.llms[0]).toBe(llm1); + }); + + it('should throw if no LLMs provided', () => { + expect(() => new FallbackAdapter({ llms: [] })).toThrow(); + }); + + it('should use primary LLM if successful', async () => { + const llm1 = new MockLLM('llm1'); + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const chunks: ChatChunk[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(3); + // Should verify it used llm1 (we can check logs or spy, but simple success is good first step) + }); + + it('should fallback to second LLM if first fails immediately', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const chunks: ChatChunk[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(3); + expect(adapter._status[0]!.available).toBe(false); + expect(adapter._status[1]!.available).toBe(true); + }); + + it('should fail if all LLMs fail', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + const llm2 = new MockLLM('llm2'); + llm2.shouldFail = true; + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const errorPromise = new Promise((resolve) => { + adapter.on('error', (e) => resolve(e.error)); + }); + + for await (const _ of stream) { + // consume + } + + const error = await errorPromise; + expect(error).toBeInstanceOf(APIConnectionError); + }); + + it('should fail if chunks sent and retryOnChunkSent is false', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + llm1.failAfterChunks = 1; // Fail after 1 chunk + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ + llms: [llm1, llm2], + retryOnChunkSent: false, + }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const errorPromise = new Promise((resolve) => { + adapter.on('error', (e) => resolve(e.error)); + }); + + for await (const _ of stream) { + // consume + } + + const error = await errorPromise; + expect(error).toBeInstanceOf(APIError); + }); + + it('should fallback if chunks sent and retryOnChunkSent is true', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + llm1.failAfterChunks = 1; + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ + llms: [llm1, llm2], + retryOnChunkSent: true, + }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const chunks: ChatChunk[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + // 1 chunk from failed llm1 + 3 chunks from llm2 + expect(chunks).toHaveLength(4); + }); + + it('should emit availability changed events', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const eventSpy = vi.fn(); + (adapter as any).on('llm_availability_changed', eventSpy); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + for await (const _ of stream) { + // consume + } + + expect(eventSpy).toHaveBeenCalledWith( + expect.objectContaining({ + llm: llm1, + available: false, + }), + ); + }); +}); diff --git a/agents/src/llm/fallback_adapter.ts b/agents/src/llm/fallback_adapter.ts new file mode 100644 index 000000000..6d9f83ea0 --- /dev/null +++ b/agents/src/llm/fallback_adapter.ts @@ -0,0 +1,391 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { APIConnectionError, APIError } from '../_exceptions.js'; +import { log } from '../log.js'; +import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; +import type { ChatContext } from './chat_context.js'; +import type { ChatChunk } from './llm.js'; +import { LLM, LLMStream } from './llm.js'; +import type { ToolChoice, ToolContext } from './tool_context.js'; + +/** + * Default connection options for FallbackAdapter. + * Uses max_retry=0 since fallback handles retries at a higher level. + */ +const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { + maxRetry: 0, + timeoutMs: DEFAULT_API_CONNECT_OPTIONS.timeoutMs, + retryIntervalMs: DEFAULT_API_CONNECT_OPTIONS.retryIntervalMs, +}; + +/** + * Internal status tracking for each LLM instance. + */ +interface LLMStatus { + available: boolean; + recoveringTask: Promise | null; +} + +/** + * Event emitted when an LLM's availability changes. + */ +export interface AvailabilityChangedEvent { + llm: LLM; + available: boolean; +} + +/** + * Options for creating a FallbackAdapter. + */ +export interface FallbackAdapterOptions { + /** List of LLM instances to fallback to (in order). */ + llms: LLM[]; + /** Timeout for each LLM attempt in seconds. Defaults to 5.0. */ + attemptTimeout?: number; + /** Internal retries per LLM before moving to next. Defaults to 0. */ + maxRetryPerLLM?: number; + /** Interval between retries in seconds. Defaults to 0.5. */ + retryInterval?: number; + /** Whether to retry when LLM fails after chunks are sent. Defaults to false. */ + retryOnChunkSent?: boolean; +} + +/** + * FallbackAdapter is an LLM that can fallback to a different LLM if the current LLM fails. + * + * @example + * ```typescript + * const fallbackLLM = new FallbackAdapter({ + * llms: [primaryLLM, secondaryLLM, tertiaryLLM], + * attemptTimeout: 5.0, + * maxRetryPerLLM: 1, + * }); + * ``` + */ +export class FallbackAdapter extends LLM { + readonly llms: LLM[]; + readonly attemptTimeout: number; + readonly maxRetryPerLLM: number; + readonly retryInterval: number; + readonly retryOnChunkSent: boolean; + + /** @internal */ + _status: LLMStatus[]; + + private logger = log(); + + constructor(options: FallbackAdapterOptions) { + super(); + + if (!options.llms || options.llms.length < 1) { + throw new Error('at least one LLM instance must be provided.'); + } + + this.llms = options.llms; + this.attemptTimeout = options.attemptTimeout ?? 5.0; + this.maxRetryPerLLM = options.maxRetryPerLLM ?? 0; + this.retryInterval = options.retryInterval ?? 0.5; + this.retryOnChunkSent = options.retryOnChunkSent ?? false; + + // Initialize status for each LLM + this._status = this.llms.map(() => ({ + available: true, + recoveringTask: null, + })); + + // Forward metrics_collected events from child LLMs + for (const llm of this.llms) { + llm.on('metrics_collected', (metrics) => { + this.emit('metrics_collected', metrics); + }); + } + } + + get model(): string { + return 'FallbackAdapter'; + } + + label(): string { + return 'FallbackAdapter'; + } + + chat(opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions?: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: ToolChoice; + extraKwargs?: Record; + }): LLMStream { + return new FallbackLLMStream(this, { + chatCtx: opts.chatCtx, + toolCtx: opts.toolCtx, + connOptions: opts.connOptions || DEFAULT_FALLBACK_API_CONNECT_OPTIONS, + parallelToolCalls: opts.parallelToolCalls, + toolChoice: opts.toolChoice, + extraKwargs: opts.extraKwargs, + }); + } + + /** + * Emit availability changed event. + * @internal + */ + _emitAvailabilityChanged(llm: LLM, available: boolean): void { + const event: AvailabilityChangedEvent = { llm, available }; + // Use type assertion for custom event + (this as unknown as { emit: (event: string, data: AvailabilityChangedEvent) => void }).emit( + 'llm_availability_changed', + event, + ); + } +} + +/** + * LLMStream implementation for FallbackAdapter. + * Handles fallback logic between multiple LLM providers. + */ +class FallbackLLMStream extends LLMStream { + private adapter: FallbackAdapter; + private parallelToolCalls?: boolean; + private toolChoice?: ToolChoice; + private extraKwargs?: Record; + private _currentStream?: LLMStream; + private _log = log(); + + constructor( + adapter: FallbackAdapter, + opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: ToolChoice; + extraKwargs?: Record; + }, + ) { + super(adapter, { + chatCtx: opts.chatCtx, + toolCtx: opts.toolCtx, + connOptions: opts.connOptions, + }); + this.adapter = adapter; + this.parallelToolCalls = opts.parallelToolCalls; + this.toolChoice = opts.toolChoice; + this.extraKwargs = opts.extraKwargs; + } + + /** + * Override chatCtx to return current stream's context if available. + */ + override get chatCtx(): ChatContext { + return this._currentStream?.chatCtx ?? super.chatCtx; + } + + /** + * Try to generate with a single LLM. + * Returns an async generator that yields chunks. + */ + private async *tryGenerate( + llm: LLM, + checkRecovery: boolean = false, + ): AsyncGenerator { + const connOptions: APIConnectOptions = { + ...this.connOptions, + maxRetry: this.adapter.maxRetryPerLLM, + timeoutMs: this.adapter.attemptTimeout * 1000, + retryIntervalMs: this.adapter.retryInterval * 1000, + }; + + const stream = llm.chat({ + chatCtx: super.chatCtx, + toolCtx: this.toolCtx, + connOptions, + parallelToolCalls: this.parallelToolCalls, + toolChoice: this.toolChoice, + extraKwargs: this.extraKwargs, + }); + + // Listen for error events - child LLMs emit errors via their LLM instance, not the stream + let streamError: Error | undefined; + const errorHandler = (ev: { error: Error }) => { + streamError = ev.error; + }; + llm.on('error', errorHandler); + + try { + let shouldSetCurrent = !checkRecovery; + for await (const chunk of stream) { + if (shouldSetCurrent) { + shouldSetCurrent = false; + this._currentStream = stream; + } + yield chunk; + } + + // If an error was emitted but not thrown through iteration, throw it now + if (streamError) { + throw streamError; + } + } catch (error) { + if (error instanceof APIError) { + if (checkRecovery) { + this._log.warn({ llm: llm.label(), error }, 'recovery failed'); + } else { + this._log.warn({ llm: llm.label(), error }, 'failed, switching to next LLM'); + } + throw error; + } + + // Handle timeout errors + if (error instanceof Error && error.name === 'AbortError') { + if (checkRecovery) { + this._log.warn({ llm: llm.label() }, 'recovery timed out'); + } else { + this._log.warn({ llm: llm.label() }, 'timed out, switching to next LLM'); + } + throw error; + } + + // Unexpected error + if (checkRecovery) { + this._log.error({ llm: llm.label(), error }, 'recovery unexpected error'); + } else { + this._log.error({ llm: llm.label(), error }, 'unexpected error, switching to next LLM'); + } + throw error; + } finally { + llm.off('error', errorHandler); + } + } + + /** + * Start background recovery task for an LLM. + */ + private tryRecovery(llm: LLM, index: number): void { + const status = this.adapter._status[index]!; + + // Skip if already recovering + if (status.recoveringTask !== null) { + return; + } + + const recoverTask = async (): Promise => { + try { + // Try to generate (just iterate to check if it works) + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of this.tryGenerate(llm, true)) { + // Just consume the stream to verify it works + } + + // Recovery successful + status.available = true; + this._log.info({ llm: llm.label() }, 'LLM recovered'); + this.adapter._emitAvailabilityChanged(llm, true); + } catch { + // Recovery failed, stay unavailable + } finally { + status.recoveringTask = null; + } + }; + + // Fire and forget + status.recoveringTask = recoverTask(); + } + + /** + * Main run method - iterates through LLMs with fallback logic. + */ + protected async run(): Promise { + const startTime = Date.now(); + + // Check if all LLMs are unavailable + const allFailed = this.adapter._status.every((s) => !s.available); + if (allFailed) { + this._log.error('all LLMs are unavailable, retrying...'); + } + + for (let i = 0; i < this.adapter.llms.length; i++) { + const llm = this.adapter.llms[i]!; + const status = this.adapter._status[i]!; + + this._log.debug( + { llm: llm.label(), index: i, available: status.available, allFailed }, + 'checking LLM', + ); + + if (status.available || allFailed) { + let textSent = ''; + const toolCallsSent: string[] = []; + + try { + this._log.info({ llm: llm.label() }, 'FallbackAdapter: Attempting provider'); + + let chunkCount = 0; + for await (const chunk of this.tryGenerate(llm, false)) { + chunkCount++; + // Track what's been sent + if (chunk.delta) { + if (chunk.delta.content) { + textSent += chunk.delta.content; + } + if (chunk.delta.toolCalls) { + for (const tc of chunk.delta.toolCalls) { + if (tc.name) { + toolCallsSent.push(tc.name); + } + } + } + } + + // Forward chunk to queue + this._log.debug({ llm: llm.label(), chunkCount }, 'run: forwarding chunk to queue'); + this.queue.put(chunk); + } + + // Success! + this._log.info( + { llm: llm.label(), totalChunks: chunkCount, textLength: textSent.length }, + 'FallbackAdapter: Provider succeeded', + ); + return; + } catch (error) { + // Mark as unavailable if it was available before + if (status.available) { + status.available = false; + this.adapter._emitAvailabilityChanged(llm, false); + } + + // Check if we sent data before failing + if (textSent || toolCallsSent.length > 0) { + const extra = { textSent, toolCallsSent }; + + if (!this.adapter.retryOnChunkSent) { + this._log.error( + { llm: llm.label(), ...extra }, + 'failed after sending chunk, skip retrying. Set `retryOnChunkSent` to `true` to enable.', + ); + throw error; + } + + this._log.warn( + { llm: llm.label(), ...extra }, + 'failed after sending chunk, retrying...', + ); + } + } + } + + // Trigger background recovery for this LLM + this.tryRecovery(llm, i); + } + + // All LLMs failed + const duration = (Date.now() - startTime) / 1000; + const labels = this.adapter.llms.map((l) => l.label()).join(', '); + throw new APIConnectionError({ + message: `all LLMs failed (${labels}) after ${duration.toFixed(2)}s`, + }); + } +} diff --git a/agents/src/llm/index.ts b/agents/src/llm/index.ts index b33d5d64d..df99e96a1 100644 --- a/agents/src/llm/index.ts +++ b/agents/src/llm/index.ts @@ -66,3 +66,9 @@ export { toJsonSchema, type OpenAIFunctionParameters, } from './utils.js'; + +export { + FallbackAdapter, + type AvailabilityChangedEvent, + type FallbackAdapterOptions, +} from './fallback_adapter.js'; diff --git a/agents/src/llm/llm.ts b/agents/src/llm/llm.ts index 746eddd7c..0eb6cea45 100644 --- a/agents/src/llm/llm.ts +++ b/agents/src/llm/llm.ts @@ -135,7 +135,7 @@ export abstract class LLMStream implements AsyncIterableIterator { // is run **after** the constructor has finished. Otherwise we get // runtime error when trying to access class variables in the // `run` method. - startSoon(() => this.mainTask().then(() => this.queue.close())); + startSoon(() => this.mainTask().finally(() => this.queue.close())); } private _mainTaskImpl = async (span: Span) => { diff --git a/examples/src/llm_fallback_adapter.ts b/examples/src/llm_fallback_adapter.ts new file mode 100644 index 000000000..dc9060114 --- /dev/null +++ b/examples/src/llm_fallback_adapter.ts @@ -0,0 +1,107 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * This example demonstrates the usage of the LLM FallbackAdapter. + * + * The FallbackAdapter allows you to configure multiple LLM providers and + * automatically fall back to the next provider if the current one fails. + * This improves reliability by ensuring your voice agent continues to work + * even if one LLM provider experiences downtime or errors. + * + * Key features: + * - Automatic failover between LLM providers + * - Background health recovery checks for failed providers + * - Configurable timeouts and retry behavior + * - Event emission when provider availability changes + */ + +import { + type JobContext, + type JobProcess, + WorkerOptions, + cli, + defineAgent, + llm, + voice, +} from '@livekit/agents'; +import * as deepgram from '@livekit/agents-plugin-deepgram'; +import * as elevenlabs from '@livekit/agents-plugin-elevenlabs'; +import * as openai from '@livekit/agents-plugin-openai'; +import * as silero from '@livekit/agents-plugin-silero'; +import { fileURLToPath } from 'node:url'; +import { z } from 'zod'; + +export default defineAgent({ + prewarm: async (proc: JobProcess) => { + proc.userData.vad = await silero.VAD.load(); + }, + entry: async (ctx: JobContext) => { + // Create multiple LLM instances for fallback + // The FallbackAdapter will try them in order: primary -> secondary -> tertiary + const primaryLLM = new openai.LLM({ model: 'gpt-4o' }); + const secondaryLLM = new openai.LLM({ model: 'gpt-4o-mini' }); + // You can mix different providers as well: + // const tertiaryLLM = new anthropic.LLM({ model: 'claude-3-5-sonnet' }); + + // Create the FallbackAdapter with your LLM instances + const fallbackLLM = new llm.FallbackAdapter({ + llms: [primaryLLM, secondaryLLM], + // Optional configuration: + attemptTimeout: 10.0, // Timeout for each LLM attempt in seconds (default: 5.0) + maxRetryPerLLM: 1, // Number of retries per LLM before moving to next (default: 0) + retryInterval: 0.5, // Interval between retries in seconds (default: 0.5) + retryOnChunkSent: false, // Whether to retry if chunks were already sent (default: false) + }); + + // Listen for availability change events + // Note: Using type assertion since FallbackAdapter extends LLM but has additional events + (fallbackLLM as llm.FallbackAdapter).on( + 'llm_availability_changed' as 'metrics_collected', + (event: unknown) => { + const e = event as llm.AvailabilityChangedEvent; + if (e.available) { + console.log(`LLM ${e.llm.label()} recovered and is now available`); + } else { + console.log(`LLM ${e.llm.label()} failed and is now unavailable`); + } + }, + ); + + const agent = new voice.Agent({ + instructions: + 'You are a helpful assistant. Demonstrate that you are working by responding to user queries.', + tools: { + getWeather: llm.tool({ + description: 'Get the weather for a given location.', + parameters: z.object({ + location: z.string().describe('The location to get the weather for'), + }), + execute: async ({ location }) => { + return `The weather in ${location} is sunny with a temperature of 72°F.`; + }, + }), + }, + }); + + const session = new voice.AgentSession({ + vad: ctx.proc.userData.vad! as silero.VAD, + stt: new deepgram.STT(), + tts: new elevenlabs.TTS(), + llm: fallbackLLM, // Use the FallbackAdapter instead of a single LLM + }); + + await session.start({ + agent, + room: ctx.room, + }); + + session.say('Hello! I am a voice agent with LLM fallback support. How can I help you today?'); + + const participant = await ctx.waitForParticipant(); + console.log('Participant joined:', participant.identity); + }, +}); + +cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) }));