-
Notifications
You must be signed in to change notification settings - Fork 109
fix: several issues related to scalability #1619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ff4bc16
236ed33
cc0063b
9a31701
dd3b07c
da051cf
f49c409
ea5b06f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,7 @@ class LocalConversation(BaseConversation): | |
| llm_registry: LLMRegistry | ||
| _cleanup_initiated: bool | ||
| _hook_processor: HookEventProcessor | None | ||
| stop_agent_on_close: bool = False | ||
|
|
||
| def __init__( | ||
| self, | ||
|
|
@@ -77,6 +78,7 @@ def __init__( | |
| type[ConversationVisualizerBase] | ConversationVisualizerBase | None | ||
| ) = DefaultConversationVisualizer, | ||
| secrets: Mapping[str, SecretValue] | None = None, | ||
| stop_agent_on_close: bool = False, | ||
| **_: object, | ||
| ): | ||
| """Initialize the conversation. | ||
|
|
@@ -222,6 +224,7 @@ def _default_callback(e): | |
|
|
||
| atexit.register(self.close) | ||
| self._start_observability_span(str(desired_id)) | ||
| self.stop_agent_on_close = stop_agent_on_close | ||
|
|
||
| @property | ||
| def id(self) -> ConversationID: | ||
|
|
@@ -535,20 +538,23 @@ def close(self) -> None: | |
| except AttributeError: | ||
| # Object may be partially constructed; span fields may be missing. | ||
| pass | ||
| try: | ||
| tools_map = self.agent.tools_map | ||
| except (AttributeError, RuntimeError): | ||
| # Agent not initialized or partially constructed | ||
| return | ||
| for tool in tools_map.values(): | ||
| if self.stop_agent_on_close: | ||
| try: | ||
| executable_tool = tool.as_executable() | ||
| executable_tool.executor.close() | ||
| except NotImplementedError: | ||
| # Tool has no executor, skip it without erroring | ||
| continue | ||
| except Exception as e: | ||
| logger.warning(f"Error closing executor for tool '{tool.name}': {e}") | ||
| tools_map = self.agent.tools_map | ||
| except (AttributeError, RuntimeError): | ||
| # Agent not initialized or partially constructed | ||
| return | ||
| for tool in tools_map.values(): | ||
| try: | ||
| executable_tool = tool.as_executable() | ||
| executable_tool.executor.close() | ||
| except NotImplementedError: | ||
| # Tool has no executor, skip it without erroring | ||
| continue | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"Error closing executor for tool '{tool.name}': {e}" | ||
| ) | ||
|
Comment on lines
+541
to
+557
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Breaking Change: Tool executors no longer cleaned up by default Previously, tool executors were always closed when conversations ended. Now they are only closed if This is a breaking behavioral change that will:
Why was this behavior changed? If there was a specific reason to make cleanup opt-in, it should be documented. Otherwise, executors should always be cleaned up to prevent resource leaks. |
||
|
|
||
| def ask_agent(self, question: str) -> str: | ||
| """Ask the agent a simple, stateless question and get a direct LLM response. | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -439,6 +439,7 @@ class RemoteConversation(BaseConversation): | |||||||||||||||||||||||||||||
| _client: httpx.Client | ||||||||||||||||||||||||||||||
| _hook_processor: HookEventProcessor | None | ||||||||||||||||||||||||||||||
| _cleanup_initiated: bool | ||||||||||||||||||||||||||||||
| stop_agent_on_close: bool = False | ||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we consider rename it to smth like: |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||
|
|
@@ -456,6 +457,7 @@ def __init__( | |||||||||||||||||||||||||||||
| type[ConversationVisualizerBase] | ConversationVisualizerBase | None | ||||||||||||||||||||||||||||||
| ) = DefaultConversationVisualizer, | ||||||||||||||||||||||||||||||
| secrets: Mapping[str, SecretValue] | None = None, | ||||||||||||||||||||||||||||||
| stop_agent_on_close: bool = False, | ||||||||||||||||||||||||||||||
| **_: object, | ||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||
| """Remote conversation proxy that talks to an agent server. | ||||||||||||||||||||||||||||||
|
|
@@ -623,6 +625,7 @@ def __init__( | |||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| self._hook_processor = HookEventProcessor(hook_manager=hook_manager) | ||||||||||||||||||||||||||||||
| self._hook_processor.run_session_start() | ||||||||||||||||||||||||||||||
| self.stop_agent_on_close = stop_agent_on_close | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def _create_llm_completion_log_callback(self) -> ConversationCallbackType: | ||||||||||||||||||||||||||||||
| """Create a callback that writes LLM completion logs to client filesystem.""" | ||||||||||||||||||||||||||||||
|
|
@@ -992,6 +995,13 @@ def close(self) -> None: | |||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self._end_observability_span() | ||||||||||||||||||||||||||||||
| if self.stop_agent_on_close: | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| # trigger server-side delete_conversation to release resources | ||||||||||||||||||||||||||||||
| # like tmux sessions | ||||||||||||||||||||||||||||||
| _send_request(self._client, "DELETE", f"/api/conversations/{self.id}") | ||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||
|
Comment on lines
+998
to
+1004
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Breaking Change: Server-side cleanup now opt-in An earlier commit (236ed33) always called DELETE to clean up tmux sessions. This was later changed to be opt-in via This means:
Additionally, the bare
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def __del__(self) -> None: | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -50,12 +50,17 @@ def client(self) -> httpx.Client: | |||||
| if client is None: | ||||||
| # Configure reasonable timeouts for HTTP requests | ||||||
| # - connect: 10 seconds to establish connection | ||||||
| # - read: 60 seconds to read response (for LLM operations) | ||||||
| # - read: 600 seconds to read response (for LLM operations) | ||||||
xingyaoww marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| # - write: 10 seconds to send request | ||||||
| # - pool: 10 seconds to get connection from pool | ||||||
| timeout = httpx.Timeout(connect=10.0, read=60.0, write=10.0, pool=10.0) | ||||||
| timeout = httpx.Timeout( | ||||||
| connect=10.0, read=self.read_timeout, write=10.0, pool=10.0 | ||||||
| ) | ||||||
| client = httpx.Client( | ||||||
| base_url=self.host, timeout=timeout, headers=self._headers | ||||||
| base_url=self.host, | ||||||
| timeout=timeout, | ||||||
| headers=self._headers, | ||||||
| limits=httpx.Limits(max_connections=None), | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Unbounded connection limit is dangerous Setting
Recommend setting a large but bounded limit instead:
Suggested change
This allows scaling to 1000 concurrent connections (10x the default) while still providing protection against runaway resource usage.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we keep the limits to be default? |
||||||
| ) | ||||||
| self._client = client | ||||||
| return client | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -221,6 +221,8 @@ def _start_container(self, image: str, context: Any) -> None: | |
| "--platform", | ||
| self.platform, | ||
| "--rm", | ||
| "--ulimit", | ||
| "nofile=65536:65536", # prevent "too many open files" errors | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider ApptainerWorkspace This ulimit fix is only applied to DockerWorkspace. Does ApptainerWorkspace need the same fix? If users run large-scale jobs with Apptainer, they could still hit "too many open files" errors. Check if |
||
| "--name", | ||
| f"agent-server-{uuid.uuid4()}", | ||
| *flags, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue: Confusing parameter name
The name
stop_agent_on_closeis misleading - it sounds like it stops the agent, but it actually controls whether server-side resources (tmux sessions, executors) are cleaned up. This will confuse users.Consider a more descriptive name:
cleanup_server_resourcesdelete_conversation_on_closerelease_resources_on_closeAlso, defaulting to
Falsemeans the scalability issues described in the PR (tmux session accumulation, resource leaks) will still occur by default. Users must explicitly opt-in to the fixes. Should this default toTrueto actually fix the problems?