Skip to content

Inconsistent queue closure in LLMStream #871

@yura2100

Description

@yura2100

Problem

Base LLMStream class closes the queue only if mainTask succeeds.

startSoon(() => this.mainTask().then(() => this.queue.close()));

However the inference LLMStream subclass closes the same queue in finally block of run method.

This is very different from Python implementation, where event channel is closed only after main task is done.

https://github.com/livekit/agents/blob/ff7c9ddb4b3eb13b407637e71035470ff76e2814/livekit-agents/livekit/agents/llm/llm.py#L176

As I understand LiveKit team tries to maintain 100% parity with Python implementation, so maybe it would make more sense to close the queue only in base LLMStream class with this.mainTask().finally(() => this.queue.close()).

Context

Our team is trying to achieve dynamic LLM timeouts based on current attempt. We implemented the following class to do this. We're passing instance of this class to activity.llm.chat in llmNode hook.

class ExponentialTimeoutAPIConnectOptions implements APIConnectOptions {
  readonly maxRetry: number;
  readonly retryIntervalMs: number;
  readonly #baseTimeoutMs: number;
  #retries: number;

  constructor(options: Partial<APIConnectOptions> = {}) {
    this.maxRetry = options.maxRetry ?? 5;
    this.retryIntervalMs = options.retryIntervalMs ?? 2000;
    this.#baseTimeoutMs = options.timeoutMs ?? 500;
    this.#retries = 0;

    if (this.maxRetry < 0) {
      throw new Error("maxRetry must be greater than or equal to 0");
    }
    if (this.retryIntervalMs < 0) {
      throw new Error("retryIntervalMs must be greater than or equal to 0");
    }
    if (this.timeoutMs < 0) {
      throw new Error("timeoutMs must be greater than or equal to 0");
    }
  }

  get timeoutMs(): number {
    return this. baseTimeoutMs * 2 ** this.#retries;
  }

  /** @internal */
  _intervalForRetry(numRetries: number): number {
    this.#retries = numRetries + 1;
    /**
     * Return the interval for the given number of retries.
     *
     * The first retry is immediate, and then uses specified retryIntervalMs
     */
    if (numRetries === 0) {
      return 0.1;
    }
    return this.retryIntervalMs;
  }
}

In theory this should work, but in reality after the first attempt, we're receiving the Queue is closed error. After some investigations we realized that without this finally block in inference LLMStream subclass it works perfectly.

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions