From 7595a172ada10253f72e3d23afb9a26e8c17cf22 Mon Sep 17 00:00:00 2001 From: krishnaShuk Date: Tue, 16 Dec 2025 19:26:39 +0530 Subject: [PATCH 1/7] feat(agents): Implement FallbackAdapter for LLM --- agents/src/llm/fallback_adapter.ts | 263 +++++++++++++++++++++++++++++ agents/src/llm/index.ts | 2 + 2 files changed, 265 insertions(+) create mode 100644 agents/src/llm/fallback_adapter.ts diff --git a/agents/src/llm/fallback_adapter.ts b/agents/src/llm/fallback_adapter.ts new file mode 100644 index 000000000..179ddb605 --- /dev/null +++ b/agents/src/llm/fallback_adapter.ts @@ -0,0 +1,263 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { APIConnectionError } from '../_exceptions.js'; +import { log } from '../log.js'; +import type { LLMMetrics } from '../metrics/base.js'; +import type { APIConnectOptions } from '../types.js'; +import type { ChatContext } from './chat_context.js'; +import { LLM, LLMStream } from './llm.js'; +import type { ToolChoice, ToolContext } from './tool_context.js'; + +export interface FallbackAdapterOptions { + llms: LLM[]; + attemptTimeout?: number; + maxRetryPerLLM?: number; + retryInterval?: number; + retryOnChunkSent?: boolean; +} + +interface LLMStatus { + available: boolean; + recoveringPromise?: Promise; +} + +export interface AvailabilityChangedEvent { + llm: LLM; + available: boolean; +} + +export type FallbackLLMCallbacks = { + metrics_collected: (metrics: LLMMetrics) => void; + llm_availability_changed: (event: AvailabilityChangedEvent) => void; + error: (error: Error) => void; +}; + +export class FallbackAdapter extends LLM { + public llms: LLM[]; + public options: Required>; + public status: Map; + + private _boundListeners: Map void>; + + constructor(options: FallbackAdapterOptions) { + super(); + if (options.llms.length < 1) { + throw new Error('At least one LLM instance must be provided.'); + } + + this.llms = options.llms; + this.options = { + attemptTimeout: options.attemptTimeout ?? 5.0, + maxRetryPerLLM: options.maxRetryPerLLM ?? 0, + retryInterval: options.retryInterval ?? 0.5, + retryOnChunkSent: options.retryOnChunkSent ?? false, + }; + + this.status = new Map(); + this._boundListeners = new Map(); + + this.llms.forEach((llm) => { + this.status.set(llm, { available: true }); + + const onMetrics = (metrics: LLMMetrics) => { + this.emit('metrics_collected', metrics); + }; + llm.on('metrics_collected', onMetrics); + this._boundListeners.set(llm, onMetrics); + }); + } + + get model(): string { + return 'FallbackAdapter'; + } + + get provider(): string { + return 'livekit'; + } + + label(): string { + return 'FallbackAdapter'; + } + + chat(opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions?: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: ToolChoice; + extraKwargs?: Record; + }): LLMStream { + const effectiveOpts = { + timeoutMs: (this.options.attemptTimeout || 5) * 1000, + retryIntervalMs: (this.options.retryInterval || 0.5) * 1000, + ...(opts.connOptions || {}), + maxRetry: 0, + } as APIConnectOptions; + + return new FallbackLLMStream(this, { + ...opts, + connOptions: effectiveOpts, + }); + } + + async aclose(): Promise { + this.llms.forEach((llm) => { + const listener = this._boundListeners.get(llm); + if (listener) { + llm.off('metrics_collected', listener); + } + }); + this._boundListeners.clear(); + await super.aclose(); + } + + markFailed(llm: LLM, chatCtx: ChatContext) { + const s = this.status.get(llm); + + if (s && s.available) { + s.available = false; + + (this as any).emit('llm_availability_changed', { llm, available: false }); + + this.triggerRecovery(llm, chatCtx); + } + } + + private triggerRecovery(llm: LLM, chatCtx: ChatContext) { + const s = this.status.get(llm); + + if (!s || s.recoveringPromise) return; + + s.recoveringPromise = (async () => { + const logger = log(); + try { + await new Promise((resolve) => setTimeout(resolve, this.options.retryInterval * 1000)); + + logger.debug(`FallbackAdapter: Checking health of ${llm.label()}`); + + const stream = llm.chat({ + chatCtx: chatCtx, + connOptions: { + timeoutMs: 5000, + maxRetry: 0, + retryIntervalMs: 0, + }, + }); + + for await (const _ of stream) { + break; + } + + s.available = true; + (this as any).emit('llm_availability_changed', { llm, available: true }); + logger.info(`FallbackAdapter: Provider ${llm.label()} recovered.`); + } catch (e) { + logger.warn(`FallbackAdapter: Recovery check failed for ${llm.label()}`); + } finally { + s.recoveringPromise = undefined; + } + })(); + } +} + +class FallbackLLMStream extends LLMStream { + private adapter: FallbackAdapter; + private _currentStream?: LLMStream; + + constructor( + adapter: FallbackAdapter, + opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: ToolChoice; + extraKwargs?: Record; + }, + ) { + super(adapter, opts); + this.adapter = adapter; + } + + get chatCtx(): ChatContext { + return this._currentStream?.chatCtx ?? super.chatCtx; + } + + get toolCtx(): ToolContext | undefined { + return this._currentStream?.toolCtx ?? super.toolCtx; + } + + async run(): Promise { + const logger = log(); + const start = Date.now(); + + try { + const allFailed = Array.from(this.adapter.status.values()).every((s) => !s.available); + if (allFailed) { + logger.error('All LLMs are unavailable, retrying...'); + } + + let candidates = this.adapter.llms.filter((llm) => this.adapter.status.get(llm)?.available); + if (allFailed || candidates.length === 0) { + candidates = this.adapter.llms; + } + + for (const llm of candidates) { + let textSent = ''; + const toolCallsSent: string[] = []; + + try { + logger.debug({ label: llm.label() }, 'FallbackAdapter: Attempting provider'); + + const childStream = llm.chat({ + chatCtx: this.chatCtx, + toolCtx: this.toolCtx, + connOptions: { + ...this.connOptions, + timeoutMs: (this.adapter.options.attemptTimeout || 5) * 1000, + maxRetry: this.adapter.options.maxRetryPerLLM, + }, + }); + + this._currentStream = childStream; + + for await (const chunk of childStream) { + if (chunk.delta) { + if (chunk.delta.content) textSent += chunk.delta.content; + if (chunk.delta.toolCalls) { + chunk.delta.toolCalls.forEach((tc) => { + if (tc.name) toolCallsSent.push(tc.name); + }); + } + } + this.queue.put(chunk); + } + + logger.debug({ label: llm.label() }, 'FallbackAdapter: Provider succeeded'); + return; + } catch (error) { + const hasSentData = textSent.length > 0 || toolCallsSent.length > 0; + const logContext = { label: llm.label(), error, textSent, toolCallsSent }; + + if (hasSentData && !this.adapter.options.retryOnChunkSent) { + logger.error(logContext, 'Provider failed after sending data. Aborting fallback.'); + throw error; + } + + logger.warn(logContext, 'FallbackAdapter: Provider failed, switching...'); + this.adapter.markFailed(llm, this.chatCtx); + } finally { + this._currentStream = undefined; + } + } + + const duration = (Date.now() - start) / 1000; + throw new APIConnectionError({ + message: `All Fallback LLMs failed after ${duration}s`, + }); + } finally { + this.queue.close(); + } + } +} diff --git a/agents/src/llm/index.ts b/agents/src/llm/index.ts index b33d5d64d..ef22ee7da 100644 --- a/agents/src/llm/index.ts +++ b/agents/src/llm/index.ts @@ -66,3 +66,5 @@ export { toJsonSchema, type OpenAIFunctionParameters, } from './utils.js'; + +export { FallbackAdapter, type FallbackAdapterOptions } from './fallback_adapter.js'; From e8dc5c5c9fba0818a8b3fd14acd4a696f0830b79 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Mon, 5 Jan 2026 01:26:28 -0800 Subject: [PATCH 2/7] Create empty-carrots-stare.md --- .changeset/empty-carrots-stare.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/empty-carrots-stare.md diff --git a/.changeset/empty-carrots-stare.md b/.changeset/empty-carrots-stare.md new file mode 100644 index 000000000..7aa423d4f --- /dev/null +++ b/.changeset/empty-carrots-stare.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Implemented FallbackAdapter for LLM From f80298cf2c96c791b78012d48029dfa4944d7d9f Mon Sep 17 00:00:00 2001 From: krishnaShuk Date: Wed, 7 Jan 2026 02:11:55 +0530 Subject: [PATCH 3/7] add example file --- examples/src/llm_fallback_adapter.ts | 107 +++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 examples/src/llm_fallback_adapter.ts diff --git a/examples/src/llm_fallback_adapter.ts b/examples/src/llm_fallback_adapter.ts new file mode 100644 index 000000000..dc9060114 --- /dev/null +++ b/examples/src/llm_fallback_adapter.ts @@ -0,0 +1,107 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * This example demonstrates the usage of the LLM FallbackAdapter. + * + * The FallbackAdapter allows you to configure multiple LLM providers and + * automatically fall back to the next provider if the current one fails. + * This improves reliability by ensuring your voice agent continues to work + * even if one LLM provider experiences downtime or errors. + * + * Key features: + * - Automatic failover between LLM providers + * - Background health recovery checks for failed providers + * - Configurable timeouts and retry behavior + * - Event emission when provider availability changes + */ + +import { + type JobContext, + type JobProcess, + WorkerOptions, + cli, + defineAgent, + llm, + voice, +} from '@livekit/agents'; +import * as deepgram from '@livekit/agents-plugin-deepgram'; +import * as elevenlabs from '@livekit/agents-plugin-elevenlabs'; +import * as openai from '@livekit/agents-plugin-openai'; +import * as silero from '@livekit/agents-plugin-silero'; +import { fileURLToPath } from 'node:url'; +import { z } from 'zod'; + +export default defineAgent({ + prewarm: async (proc: JobProcess) => { + proc.userData.vad = await silero.VAD.load(); + }, + entry: async (ctx: JobContext) => { + // Create multiple LLM instances for fallback + // The FallbackAdapter will try them in order: primary -> secondary -> tertiary + const primaryLLM = new openai.LLM({ model: 'gpt-4o' }); + const secondaryLLM = new openai.LLM({ model: 'gpt-4o-mini' }); + // You can mix different providers as well: + // const tertiaryLLM = new anthropic.LLM({ model: 'claude-3-5-sonnet' }); + + // Create the FallbackAdapter with your LLM instances + const fallbackLLM = new llm.FallbackAdapter({ + llms: [primaryLLM, secondaryLLM], + // Optional configuration: + attemptTimeout: 10.0, // Timeout for each LLM attempt in seconds (default: 5.0) + maxRetryPerLLM: 1, // Number of retries per LLM before moving to next (default: 0) + retryInterval: 0.5, // Interval between retries in seconds (default: 0.5) + retryOnChunkSent: false, // Whether to retry if chunks were already sent (default: false) + }); + + // Listen for availability change events + // Note: Using type assertion since FallbackAdapter extends LLM but has additional events + (fallbackLLM as llm.FallbackAdapter).on( + 'llm_availability_changed' as 'metrics_collected', + (event: unknown) => { + const e = event as llm.AvailabilityChangedEvent; + if (e.available) { + console.log(`LLM ${e.llm.label()} recovered and is now available`); + } else { + console.log(`LLM ${e.llm.label()} failed and is now unavailable`); + } + }, + ); + + const agent = new voice.Agent({ + instructions: + 'You are a helpful assistant. Demonstrate that you are working by responding to user queries.', + tools: { + getWeather: llm.tool({ + description: 'Get the weather for a given location.', + parameters: z.object({ + location: z.string().describe('The location to get the weather for'), + }), + execute: async ({ location }) => { + return `The weather in ${location} is sunny with a temperature of 72°F.`; + }, + }), + }, + }); + + const session = new voice.AgentSession({ + vad: ctx.proc.userData.vad! as silero.VAD, + stt: new deepgram.STT(), + tts: new elevenlabs.TTS(), + llm: fallbackLLM, // Use the FallbackAdapter instead of a single LLM + }); + + await session.start({ + agent, + room: ctx.room, + }); + + session.say('Hello! I am a voice agent with LLM fallback support. How can I help you today?'); + + const participant = await ctx.waitForParticipant(); + console.log('Participant joined:', participant.identity); + }, +}); + +cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) })); From c9736b28a0750d79a5dccedf639d680f90ac4e2d Mon Sep 17 00:00:00 2001 From: krishnaShuk Date: Wed, 7 Jan 2026 02:16:22 +0530 Subject: [PATCH 4/7] updated index.ts --- agents/src/llm/index.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/agents/src/llm/index.ts b/agents/src/llm/index.ts index ef22ee7da..df99e96a1 100644 --- a/agents/src/llm/index.ts +++ b/agents/src/llm/index.ts @@ -67,4 +67,8 @@ export { type OpenAIFunctionParameters, } from './utils.js'; -export { FallbackAdapter, type FallbackAdapterOptions } from './fallback_adapter.js'; +export { + FallbackAdapter, + type AvailabilityChangedEvent, + type FallbackAdapterOptions, +} from './fallback_adapter.js'; From b21b2c3922dd981f727b2a6b36876df961065ca1 Mon Sep 17 00:00:00 2001 From: krishnaShuk Date: Sat, 10 Jan 2026 00:39:20 +0530 Subject: [PATCH 5/7] made changes in fallback_adapter --- agents/src/llm/fallback_adapter.ts | 448 ++++++++++++++++++----------- 1 file changed, 288 insertions(+), 160 deletions(-) diff --git a/agents/src/llm/fallback_adapter.ts b/agents/src/llm/fallback_adapter.ts index 179ddb605..6d9f83ea0 100644 --- a/agents/src/llm/fallback_adapter.ts +++ b/agents/src/llm/fallback_adapter.ts @@ -1,81 +1,111 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { APIConnectionError } from '../_exceptions.js'; +import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; -import type { LLMMetrics } from '../metrics/base.js'; -import type { APIConnectOptions } from '../types.js'; +import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; import type { ChatContext } from './chat_context.js'; +import type { ChatChunk } from './llm.js'; import { LLM, LLMStream } from './llm.js'; import type { ToolChoice, ToolContext } from './tool_context.js'; -export interface FallbackAdapterOptions { - llms: LLM[]; - attemptTimeout?: number; - maxRetryPerLLM?: number; - retryInterval?: number; - retryOnChunkSent?: boolean; -} +/** + * Default connection options for FallbackAdapter. + * Uses max_retry=0 since fallback handles retries at a higher level. + */ +const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { + maxRetry: 0, + timeoutMs: DEFAULT_API_CONNECT_OPTIONS.timeoutMs, + retryIntervalMs: DEFAULT_API_CONNECT_OPTIONS.retryIntervalMs, +}; +/** + * Internal status tracking for each LLM instance. + */ interface LLMStatus { available: boolean; - recoveringPromise?: Promise; + recoveringTask: Promise | null; } +/** + * Event emitted when an LLM's availability changes. + */ export interface AvailabilityChangedEvent { llm: LLM; available: boolean; } -export type FallbackLLMCallbacks = { - metrics_collected: (metrics: LLMMetrics) => void; - llm_availability_changed: (event: AvailabilityChangedEvent) => void; - error: (error: Error) => void; -}; +/** + * Options for creating a FallbackAdapter. + */ +export interface FallbackAdapterOptions { + /** List of LLM instances to fallback to (in order). */ + llms: LLM[]; + /** Timeout for each LLM attempt in seconds. Defaults to 5.0. */ + attemptTimeout?: number; + /** Internal retries per LLM before moving to next. Defaults to 0. */ + maxRetryPerLLM?: number; + /** Interval between retries in seconds. Defaults to 0.5. */ + retryInterval?: number; + /** Whether to retry when LLM fails after chunks are sent. Defaults to false. */ + retryOnChunkSent?: boolean; +} +/** + * FallbackAdapter is an LLM that can fallback to a different LLM if the current LLM fails. + * + * @example + * ```typescript + * const fallbackLLM = new FallbackAdapter({ + * llms: [primaryLLM, secondaryLLM, tertiaryLLM], + * attemptTimeout: 5.0, + * maxRetryPerLLM: 1, + * }); + * ``` + */ export class FallbackAdapter extends LLM { - public llms: LLM[]; - public options: Required>; - public status: Map; + readonly llms: LLM[]; + readonly attemptTimeout: number; + readonly maxRetryPerLLM: number; + readonly retryInterval: number; + readonly retryOnChunkSent: boolean; + + /** @internal */ + _status: LLMStatus[]; - private _boundListeners: Map void>; + private logger = log(); constructor(options: FallbackAdapterOptions) { super(); - if (options.llms.length < 1) { - throw new Error('At least one LLM instance must be provided.'); + + if (!options.llms || options.llms.length < 1) { + throw new Error('at least one LLM instance must be provided.'); } this.llms = options.llms; - this.options = { - attemptTimeout: options.attemptTimeout ?? 5.0, - maxRetryPerLLM: options.maxRetryPerLLM ?? 0, - retryInterval: options.retryInterval ?? 0.5, - retryOnChunkSent: options.retryOnChunkSent ?? false, - }; - - this.status = new Map(); - this._boundListeners = new Map(); - - this.llms.forEach((llm) => { - this.status.set(llm, { available: true }); - - const onMetrics = (metrics: LLMMetrics) => { + this.attemptTimeout = options.attemptTimeout ?? 5.0; + this.maxRetryPerLLM = options.maxRetryPerLLM ?? 0; + this.retryInterval = options.retryInterval ?? 0.5; + this.retryOnChunkSent = options.retryOnChunkSent ?? false; + + // Initialize status for each LLM + this._status = this.llms.map(() => ({ + available: true, + recoveringTask: null, + })); + + // Forward metrics_collected events from child LLMs + for (const llm of this.llms) { + llm.on('metrics_collected', (metrics) => { this.emit('metrics_collected', metrics); - }; - llm.on('metrics_collected', onMetrics); - this._boundListeners.set(llm, onMetrics); - }); + }); + } } get model(): string { return 'FallbackAdapter'; } - get provider(): string { - return 'livekit'; - } - label(): string { return 'FallbackAdapter'; } @@ -88,82 +118,41 @@ export class FallbackAdapter extends LLM { toolChoice?: ToolChoice; extraKwargs?: Record; }): LLMStream { - const effectiveOpts = { - timeoutMs: (this.options.attemptTimeout || 5) * 1000, - retryIntervalMs: (this.options.retryInterval || 0.5) * 1000, - ...(opts.connOptions || {}), - maxRetry: 0, - } as APIConnectOptions; - return new FallbackLLMStream(this, { - ...opts, - connOptions: effectiveOpts, - }); - } - - async aclose(): Promise { - this.llms.forEach((llm) => { - const listener = this._boundListeners.get(llm); - if (listener) { - llm.off('metrics_collected', listener); - } + chatCtx: opts.chatCtx, + toolCtx: opts.toolCtx, + connOptions: opts.connOptions || DEFAULT_FALLBACK_API_CONNECT_OPTIONS, + parallelToolCalls: opts.parallelToolCalls, + toolChoice: opts.toolChoice, + extraKwargs: opts.extraKwargs, }); - this._boundListeners.clear(); - await super.aclose(); - } - - markFailed(llm: LLM, chatCtx: ChatContext) { - const s = this.status.get(llm); - - if (s && s.available) { - s.available = false; - - (this as any).emit('llm_availability_changed', { llm, available: false }); - - this.triggerRecovery(llm, chatCtx); - } } - private triggerRecovery(llm: LLM, chatCtx: ChatContext) { - const s = this.status.get(llm); - - if (!s || s.recoveringPromise) return; - - s.recoveringPromise = (async () => { - const logger = log(); - try { - await new Promise((resolve) => setTimeout(resolve, this.options.retryInterval * 1000)); - - logger.debug(`FallbackAdapter: Checking health of ${llm.label()}`); - - const stream = llm.chat({ - chatCtx: chatCtx, - connOptions: { - timeoutMs: 5000, - maxRetry: 0, - retryIntervalMs: 0, - }, - }); - - for await (const _ of stream) { - break; - } - - s.available = true; - (this as any).emit('llm_availability_changed', { llm, available: true }); - logger.info(`FallbackAdapter: Provider ${llm.label()} recovered.`); - } catch (e) { - logger.warn(`FallbackAdapter: Recovery check failed for ${llm.label()}`); - } finally { - s.recoveringPromise = undefined; - } - })(); + /** + * Emit availability changed event. + * @internal + */ + _emitAvailabilityChanged(llm: LLM, available: boolean): void { + const event: AvailabilityChangedEvent = { llm, available }; + // Use type assertion for custom event + (this as unknown as { emit: (event: string, data: AvailabilityChangedEvent) => void }).emit( + 'llm_availability_changed', + event, + ); } } +/** + * LLMStream implementation for FallbackAdapter. + * Handles fallback logic between multiple LLM providers. + */ class FallbackLLMStream extends LLMStream { private adapter: FallbackAdapter; + private parallelToolCalls?: boolean; + private toolChoice?: ToolChoice; + private extraKwargs?: Record; private _currentStream?: LLMStream; + private _log = log(); constructor( adapter: FallbackAdapter, @@ -176,88 +165,227 @@ class FallbackLLMStream extends LLMStream { extraKwargs?: Record; }, ) { - super(adapter, opts); + super(adapter, { + chatCtx: opts.chatCtx, + toolCtx: opts.toolCtx, + connOptions: opts.connOptions, + }); this.adapter = adapter; + this.parallelToolCalls = opts.parallelToolCalls; + this.toolChoice = opts.toolChoice; + this.extraKwargs = opts.extraKwargs; } - get chatCtx(): ChatContext { + /** + * Override chatCtx to return current stream's context if available. + */ + override get chatCtx(): ChatContext { return this._currentStream?.chatCtx ?? super.chatCtx; } - get toolCtx(): ToolContext | undefined { - return this._currentStream?.toolCtx ?? super.toolCtx; - } + /** + * Try to generate with a single LLM. + * Returns an async generator that yields chunks. + */ + private async *tryGenerate( + llm: LLM, + checkRecovery: boolean = false, + ): AsyncGenerator { + const connOptions: APIConnectOptions = { + ...this.connOptions, + maxRetry: this.adapter.maxRetryPerLLM, + timeoutMs: this.adapter.attemptTimeout * 1000, + retryIntervalMs: this.adapter.retryInterval * 1000, + }; - async run(): Promise { - const logger = log(); - const start = Date.now(); + const stream = llm.chat({ + chatCtx: super.chatCtx, + toolCtx: this.toolCtx, + connOptions, + parallelToolCalls: this.parallelToolCalls, + toolChoice: this.toolChoice, + extraKwargs: this.extraKwargs, + }); + + // Listen for error events - child LLMs emit errors via their LLM instance, not the stream + let streamError: Error | undefined; + const errorHandler = (ev: { error: Error }) => { + streamError = ev.error; + }; + llm.on('error', errorHandler); try { - const allFailed = Array.from(this.adapter.status.values()).every((s) => !s.available); - if (allFailed) { - logger.error('All LLMs are unavailable, retrying...'); + let shouldSetCurrent = !checkRecovery; + for await (const chunk of stream) { + if (shouldSetCurrent) { + shouldSetCurrent = false; + this._currentStream = stream; + } + yield chunk; + } + + // If an error was emitted but not thrown through iteration, throw it now + if (streamError) { + throw streamError; + } + } catch (error) { + if (error instanceof APIError) { + if (checkRecovery) { + this._log.warn({ llm: llm.label(), error }, 'recovery failed'); + } else { + this._log.warn({ llm: llm.label(), error }, 'failed, switching to next LLM'); + } + throw error; + } + + // Handle timeout errors + if (error instanceof Error && error.name === 'AbortError') { + if (checkRecovery) { + this._log.warn({ llm: llm.label() }, 'recovery timed out'); + } else { + this._log.warn({ llm: llm.label() }, 'timed out, switching to next LLM'); + } + throw error; } - let candidates = this.adapter.llms.filter((llm) => this.adapter.status.get(llm)?.available); - if (allFailed || candidates.length === 0) { - candidates = this.adapter.llms; + // Unexpected error + if (checkRecovery) { + this._log.error({ llm: llm.label(), error }, 'recovery unexpected error'); + } else { + this._log.error({ llm: llm.label(), error }, 'unexpected error, switching to next LLM'); } + throw error; + } finally { + llm.off('error', errorHandler); + } + } + + /** + * Start background recovery task for an LLM. + */ + private tryRecovery(llm: LLM, index: number): void { + const status = this.adapter._status[index]!; - for (const llm of candidates) { + // Skip if already recovering + if (status.recoveringTask !== null) { + return; + } + + const recoverTask = async (): Promise => { + try { + // Try to generate (just iterate to check if it works) + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _chunk of this.tryGenerate(llm, true)) { + // Just consume the stream to verify it works + } + + // Recovery successful + status.available = true; + this._log.info({ llm: llm.label() }, 'LLM recovered'); + this.adapter._emitAvailabilityChanged(llm, true); + } catch { + // Recovery failed, stay unavailable + } finally { + status.recoveringTask = null; + } + }; + + // Fire and forget + status.recoveringTask = recoverTask(); + } + + /** + * Main run method - iterates through LLMs with fallback logic. + */ + protected async run(): Promise { + const startTime = Date.now(); + + // Check if all LLMs are unavailable + const allFailed = this.adapter._status.every((s) => !s.available); + if (allFailed) { + this._log.error('all LLMs are unavailable, retrying...'); + } + + for (let i = 0; i < this.adapter.llms.length; i++) { + const llm = this.adapter.llms[i]!; + const status = this.adapter._status[i]!; + + this._log.debug( + { llm: llm.label(), index: i, available: status.available, allFailed }, + 'checking LLM', + ); + + if (status.available || allFailed) { let textSent = ''; const toolCallsSent: string[] = []; try { - logger.debug({ label: llm.label() }, 'FallbackAdapter: Attempting provider'); - - const childStream = llm.chat({ - chatCtx: this.chatCtx, - toolCtx: this.toolCtx, - connOptions: { - ...this.connOptions, - timeoutMs: (this.adapter.options.attemptTimeout || 5) * 1000, - maxRetry: this.adapter.options.maxRetryPerLLM, - }, - }); - - this._currentStream = childStream; + this._log.info({ llm: llm.label() }, 'FallbackAdapter: Attempting provider'); - for await (const chunk of childStream) { + let chunkCount = 0; + for await (const chunk of this.tryGenerate(llm, false)) { + chunkCount++; + // Track what's been sent if (chunk.delta) { - if (chunk.delta.content) textSent += chunk.delta.content; + if (chunk.delta.content) { + textSent += chunk.delta.content; + } if (chunk.delta.toolCalls) { - chunk.delta.toolCalls.forEach((tc) => { - if (tc.name) toolCallsSent.push(tc.name); - }); + for (const tc of chunk.delta.toolCalls) { + if (tc.name) { + toolCallsSent.push(tc.name); + } + } } } + + // Forward chunk to queue + this._log.debug({ llm: llm.label(), chunkCount }, 'run: forwarding chunk to queue'); this.queue.put(chunk); } - logger.debug({ label: llm.label() }, 'FallbackAdapter: Provider succeeded'); + // Success! + this._log.info( + { llm: llm.label(), totalChunks: chunkCount, textLength: textSent.length }, + 'FallbackAdapter: Provider succeeded', + ); return; } catch (error) { - const hasSentData = textSent.length > 0 || toolCallsSent.length > 0; - const logContext = { label: llm.label(), error, textSent, toolCallsSent }; - - if (hasSentData && !this.adapter.options.retryOnChunkSent) { - logger.error(logContext, 'Provider failed after sending data. Aborting fallback.'); - throw error; + // Mark as unavailable if it was available before + if (status.available) { + status.available = false; + this.adapter._emitAvailabilityChanged(llm, false); } - logger.warn(logContext, 'FallbackAdapter: Provider failed, switching...'); - this.adapter.markFailed(llm, this.chatCtx); - } finally { - this._currentStream = undefined; + // Check if we sent data before failing + if (textSent || toolCallsSent.length > 0) { + const extra = { textSent, toolCallsSent }; + + if (!this.adapter.retryOnChunkSent) { + this._log.error( + { llm: llm.label(), ...extra }, + 'failed after sending chunk, skip retrying. Set `retryOnChunkSent` to `true` to enable.', + ); + throw error; + } + + this._log.warn( + { llm: llm.label(), ...extra }, + 'failed after sending chunk, retrying...', + ); + } } } - const duration = (Date.now() - start) / 1000; - throw new APIConnectionError({ - message: `All Fallback LLMs failed after ${duration}s`, - }); - } finally { - this.queue.close(); + // Trigger background recovery for this LLM + this.tryRecovery(llm, i); } + + // All LLMs failed + const duration = (Date.now() - startTime) / 1000; + const labels = this.adapter.llms.map((l) => l.label()).join(', '); + throw new APIConnectionError({ + message: `all LLMs failed (${labels}) after ${duration.toFixed(2)}s`, + }); } } From 1ec5b631b711d78d23b28d36beab38e1899ef982 Mon Sep 17 00:00:00 2001 From: krishnaShuk Date: Sat, 10 Jan 2026 00:40:14 +0530 Subject: [PATCH 6/7] added test coverage for fallback adapter --- agents/src/llm/fallback_adapter.test.ts | 238 ++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 agents/src/llm/fallback_adapter.test.ts diff --git a/agents/src/llm/fallback_adapter.test.ts b/agents/src/llm/fallback_adapter.test.ts new file mode 100644 index 000000000..84f3a9141 --- /dev/null +++ b/agents/src/llm/fallback_adapter.test.ts @@ -0,0 +1,238 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { beforeAll, describe, expect, it, vi } from 'vitest'; +import { APIConnectionError, APIError } from '../_exceptions.js'; +import { initializeLogger } from '../log.js'; +import type { APIConnectOptions } from '../types.js'; +import { delay } from '../utils.js'; +import type { ChatContext } from './chat_context.js'; +import { FallbackAdapter } from './fallback_adapter.js'; +import { type ChatChunk, LLM, LLMStream } from './llm.js'; +import type { ToolChoice, ToolContext } from './tool_context.js'; + +class MockLLMStream extends LLMStream { + public myLLM: LLM; + + constructor( + llm: LLM, + opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions: APIConnectOptions; + }, + private shouldFail: boolean = false, + private failAfterChunks: number = 0, + ) { + super(llm, opts); + this.myLLM = llm; + } + + protected async run(): Promise { + if (this.shouldFail && this.failAfterChunks === 0) { + throw new APIError('Mock LLM failed immediately'); + } + + const chunk: ChatChunk = { + id: 'test-id', + delta: { role: 'assistant', content: 'chunk' }, + }; + + for (let i = 0; i < 3; i++) { + if (this.shouldFail && i === this.failAfterChunks) { + throw new APIError('Mock LLM failed after chunks'); + } + this.queue.put(chunk); + await delay(10); + } + } +} + +class MockLLM extends LLM { + shouldFail: boolean = false; + failAfterChunks: number = 0; + private _label: string; + + constructor(label: string) { + super(); + this._label = label; + } + + label(): string { + return this._label; + } + + chat(opts: { + chatCtx: ChatContext; + toolCtx?: ToolContext; + connOptions?: APIConnectOptions; + parallelToolCalls?: boolean; + toolChoice?: ToolChoice; + extraKwargs?: Record; + }): LLMStream { + return new MockLLMStream( + this, + { + chatCtx: opts.chatCtx, + toolCtx: opts.toolCtx, + connOptions: opts.connOptions!, + }, + this.shouldFail, + this.failAfterChunks, + ); + } +} + +describe('FallbackAdapter', () => { + beforeAll(() => { + initializeLogger({ pretty: false }); + // Suppress unhandled rejections from LLMStream background tasks + process.on('unhandledRejection', () => {}); + }); + + it('should initialize correctly', () => { + const llm1 = new MockLLM('llm1'); + const adapter = new FallbackAdapter({ llms: [llm1] }); + expect(adapter.llms).toHaveLength(1); + expect(adapter.llms[0]).toBe(llm1); + }); + + it('should throw if no LLMs provided', () => { + expect(() => new FallbackAdapter({ llms: [] })).toThrow(); + }); + + it('should use primary LLM if successful', async () => { + const llm1 = new MockLLM('llm1'); + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const chunks: ChatChunk[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(3); + // Should verify it used llm1 (we can check logs or spy, but simple success is good first step) + }); + + it('should fallback to second LLM if first fails immediately', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const chunks: ChatChunk[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(3); + expect(adapter._status[0]!.available).toBe(false); + expect(adapter._status[1]!.available).toBe(true); + }); + + it('should fail if all LLMs fail', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + const llm2 = new MockLLM('llm2'); + llm2.shouldFail = true; + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const errorPromise = new Promise((resolve) => { + adapter.on('error', (e) => resolve(e.error)); + }); + + for await (const _ of stream) { + // consume + } + + const error = await errorPromise; + expect(error).toBeInstanceOf(APIConnectionError); + }); + + it('should fail if chunks sent and retryOnChunkSent is false', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + llm1.failAfterChunks = 1; // Fail after 1 chunk + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ + llms: [llm1, llm2], + retryOnChunkSent: false, + }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const errorPromise = new Promise((resolve) => { + adapter.on('error', (e) => resolve(e.error)); + }); + + for await (const _ of stream) { + // consume + } + + const error = await errorPromise; + expect(error).toBeInstanceOf(APIError); + }); + + it('should fallback if chunks sent and retryOnChunkSent is true', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + llm1.failAfterChunks = 1; + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ + llms: [llm1, llm2], + retryOnChunkSent: true, + }); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + const chunks: ChatChunk[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + // 1 chunk from failed llm1 + 3 chunks from llm2 + expect(chunks).toHaveLength(4); + }); + + it('should emit availability changed events', async () => { + const llm1 = new MockLLM('llm1'); + llm1.shouldFail = true; + const llm2 = new MockLLM('llm2'); + const adapter = new FallbackAdapter({ llms: [llm1, llm2] }); + + const eventSpy = vi.fn(); + (adapter as any).on('llm_availability_changed', eventSpy); + + const stream = adapter.chat({ + chatCtx: {} as ChatContext, + }); + + for await (const _ of stream) { + // consume + } + + expect(eventSpy).toHaveBeenCalledWith( + expect.objectContaining({ + llm: llm1, + available: false, + }), + ); + }); +}); From fb996484f37f2bf6b09dbd01ab0c50561ca4f4f0 Mon Sep 17 00:00:00 2001 From: krishnaShuk Date: Sat, 10 Jan 2026 00:42:23 +0530 Subject: [PATCH 7/7] .then to .finally --- agents/src/llm/llm.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/src/llm/llm.ts b/agents/src/llm/llm.ts index 746eddd7c..0eb6cea45 100644 --- a/agents/src/llm/llm.ts +++ b/agents/src/llm/llm.ts @@ -135,7 +135,7 @@ export abstract class LLMStream implements AsyncIterableIterator { // is run **after** the constructor has finished. Otherwise we get // runtime error when trying to access class variables in the // `run` method. - startSoon(() => this.mainTask().then(() => this.queue.close())); + startSoon(() => this.mainTask().finally(() => this.queue.close())); } private _mainTaskImpl = async (span: Span) => {