From 785fb6ec6925fcf66838ae065815f45154a8b16d Mon Sep 17 00:00:00 2001 From: Claude Agent Date: Wed, 28 Jan 2026 13:46:37 +0000 Subject: [PATCH] Bug RHOAIENG-39117: Implement proper AG-UI system/developer message roles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem System messages (like "Repository added", "End turn X", "Workflow loaded") were not appearing in the chat even when "Show system messages" was enabled. The issue was that the runner was not emitting them as proper AG-UI TextMessage events with the correct role field. ## Root Cause The adapter.py was emitting platform logs and system messages as RawEvent with type "system_log", rather than as proper AG-UI TextMessage events with role fields as specified in the AG-UI protocol. ## Changes ### Backend (adapter.py) - Added helper method _emit_developer_message() to emit proper AG-UI TextMessage events with role="developer" for platform logging - Converted all system_log RawEvent emissions to TextMessage events with roles: - Platform logs (repo cloning, workflow loading, etc.) → role="developer" - Claude system messages (turn info, etc.) remain as debug logs - Updated messages for: - Repository cloning and workspace preparation - Multi-repo workspace operations - Workflow initialization and cloning - Session continuation messages - Error messages ### Frontend Types (agentic-session.ts) - Added optional role field to UserMessage and AgentMessage types - Created new DeveloperMessage type for role="developer" messages - Created new SystemRoleMessage type for role="system" messages - Updated Message union type to include new message types ### Frontend Components (stream-message.tsx) - Updated message rendering to handle AG-UI role field - Added role-based display logic: - role="assistant" → displayed as Claude AI (bot) - role="developer" → displayed as Platform (system) - role="system" → displayed as System (system) - role="user" → displayed as You (user) ### UI Component (message.tsx) - Extended MessageRole type to include "system" - Updated avatar rendering for system role messages (gray "SYS" badge) - Added appropriate styling for system messages (muted background, muted text) - Updated layout logic to handle system role messages ### Message Filtering (MessagesTab.tsx) - Updated filter logic to check for AG-UI role field - Now filters messages by role="system" or role="developer" when showSystemMessages is false - Maintains backward compatibility with legacy type="system_message" ## Testing System messages should now: - Be emitted as proper AG-UI TextMessage events with roles - Appear in chat when "Show system messages" checkbox is enabled - Be hidden by default when checkbox is disabled - Render with appropriate styling (gray badge, muted background) Co-Authored-By: Claude (claude-sonnet-4-5) --- .../src/components/session/MessagesTab.tsx | 12 +- .../frontend/src/components/ui/message.tsx | 23 +- .../src/components/ui/stream-message.tsx | 26 +- .../frontend/src/types/agentic-session.ts | 18 +- .../runners/claude-code-runner/adapter.py | 272 ++++++++++++------ 5 files changed, 242 insertions(+), 109 deletions(-) diff --git a/components/frontend/src/components/session/MessagesTab.tsx b/components/frontend/src/components/session/MessagesTab.tsx index 3e45d60b6..4e8d7bf98 100644 --- a/components/frontend/src/components/session/MessagesTab.tsx +++ b/components/frontend/src/components/session/MessagesTab.tsx @@ -73,13 +73,17 @@ const MessagesTab: React.FC = ({ session, streamMessages, chat // Filter out system messages unless showSystemMessages is true const filteredMessages = streamMessages.filter((msg) => { if (showSystemMessages) return true; - - // Hide system_message type by default - // Check if msg has a type property and if it's a system_message + + // Hide system_message type by default (legacy) if ('type' in msg && msg.type === "system_message") { return false; } - + + // Hide messages with system or developer role (AG-UI protocol) + if ('role' in msg && (msg.role === "system" || msg.role === "developer")) { + return false; + } + return true; }); diff --git a/components/frontend/src/components/ui/message.tsx b/components/frontend/src/components/ui/message.tsx index 398858e87..67cb93507 100644 --- a/components/frontend/src/components/ui/message.tsx +++ b/components/frontend/src/components/ui/message.tsx @@ -7,7 +7,7 @@ import remarkGfm from "remark-gfm"; import type { Components } from "react-markdown"; import { formatTimestamp } from "@/lib/format-timestamp"; -export type MessageRole = "bot" | "user"; +export type MessageRole = "bot" | "user" | "system"; export type MessageProps = { role: MessageRole; @@ -176,8 +176,9 @@ export const Message = React.forwardRef( ref ) => { const isBot = role === "bot"; - const avatarBg = isBot ? "bg-blue-600" : "bg-green-600"; - const avatarText = isBot ? "AI" : "U"; + const isSystem = role === "system"; + const avatarBg = isBot ? "bg-blue-600" : isSystem ? "bg-gray-500" : "bg-green-600"; + const avatarText = isBot ? "AI" : isSystem ? "SYS" : "U"; const formattedTime = formatTimestamp(timestamp); const isActivelyStreaming = streaming && isBot; @@ -198,25 +199,25 @@ export const Message = React.forwardRef( ) return ( -
-
+
+
{/* Avatar */} - {isBot ? avatar : null} + {(isBot || isSystem) ? avatar : null} {/* Message Content */} -
+
{/* Timestamp */} {formattedTime && ( -
+
{formattedTime}
)}
{/* Content */} -
+
{isLoading ? (
{content}
@@ -243,7 +244,7 @@ export const Message = React.forwardRef(
- {isBot ? null : avatar} + {(isBot || isSystem) ? null : avatar}
); diff --git a/components/frontend/src/components/ui/stream-message.tsx b/components/frontend/src/components/ui/stream-message.tsx index 77cbab175..026dcd24d 100644 --- a/components/frontend/src/components/ui/stream-message.tsx +++ b/components/frontend/src/components/ui/stream-message.tsx @@ -63,14 +63,36 @@ export const StreamMessage: React.FC = ({ message, onGoToRes case "user_message": case "agent_message": { const isStreaming = 'streaming' in message && message.streaming; + + // Check for AG-UI role field + const role = 'role' in m ? m.role : undefined; + + // Determine display role based on AG-UI role + let displayRole: "user" | "bot" | "system" = "user"; + let displayName = "You"; + + if (role === "assistant" || (m.type === "agent_message" && !role)) { + displayRole = "bot"; + displayName = "Claude AI"; + } else if (role === "developer") { + displayRole = "system"; + displayName = "Platform"; + } else if (role === "system") { + displayRole = "system"; + displayName = "System"; + } else if (role === "user" || m.type === "user_message") { + displayRole = "user"; + displayName = "You"; + } + if (typeof m.content === "string") { - return ; + return ; } switch (m.content.type) { case "thinking_block": return case "text_block": - return + return case "tool_use_block": return case "tool_result_block": diff --git a/components/frontend/src/types/agentic-session.ts b/components/frontend/src/types/agentic-session.ts index 4deb326cb..8544307d6 100644 --- a/components/frontend/src/types/agentic-session.ts +++ b/components/frontend/src/types/agentic-session.ts @@ -102,7 +102,7 @@ export type HierarchicalToolMessage = ToolUseMessages & { // ----------------------------- // Message Types // ----------------------------- -export type Message = UserMessage | AgentMessage | SystemMessage | ResultMessage | ToolUseMessages | AgentRunningMessage | AgentWaitingMessage; +export type Message = UserMessage | AgentMessage | SystemMessage | ResultMessage | ToolUseMessages | AgentRunningMessage | AgentWaitingMessage | DeveloperMessage | SystemRoleMessage; export type AgentRunningMessage = { type: "agent_running"; @@ -117,19 +117,35 @@ export type UserMessage = { type: "user_message"; content: ContentBlock | string; timestamp: string; + role?: "user"; // AG-UI role field } export type AgentMessage = { type: "agent_message"; content: ContentBlock; model: string; timestamp: string; + role?: "assistant"; // AG-UI role field } +// Legacy system_message type (for compatibility) export type SystemMessage = { type: "system_message"; subtype: string; data: Record; timestamp: string; } +// New AG-UI role-based messages +export type DeveloperMessage = { + type: "user_message" | "agent_message"; + content: ContentBlock | string; + timestamp: string; + role: "developer"; // Platform/internal logging +} +export type SystemRoleMessage = { + type: "user_message" | "agent_message"; + content: ContentBlock | string; + timestamp: string; + role: "system"; // Claude system messages (turn info, etc) +} export type ResultMessage = { type: "result_message"; subtype: string; diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 419e493d2..3f450993d 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -107,6 +107,32 @@ def _timestamp(self) -> str: """Return current UTC timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() + async def _emit_developer_message( + self, message: str, thread_id: str, run_id: str + ) -> AsyncIterator[BaseEvent]: + """Helper to emit a developer role message (platform logging).""" + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta=message, + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + ) + async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: """ Process a run and yield AG-UI events. @@ -199,11 +225,27 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven if not user_message: logger.warning("No user message found in input") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="No user message provided", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "No user message provided"} + message_id=msg_id, ) yield RunFinishedEvent( type=EventType.RUN_FINISHED, @@ -431,11 +473,27 @@ async def _run_claude_agent_sdk( try: options.continue_conversation = True logger.info("Enabled continue_conversation for session resumption") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "🔄 Continuing conversation from previous state"} + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="🔄 Continuing conversation from previous state", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, ) except Exception as e: logger.warning(f"Failed to set continue_conversation: {e}") @@ -492,11 +550,27 @@ def create_sdk_client(opts, disable_continue=False): error_str = str(resume_error).lower() if "no conversation found" in error_str or "session" in error_str: logger.warning(f"Conversation continuation failed: {resume_error}") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="⚠️ Could not continue conversation, starting fresh...", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "⚠️ Could not continue conversation, starting fresh..."} + message_id=msg_id, ) client = create_sdk_client(options, disable_continue=True) await client.connect() @@ -506,13 +580,29 @@ def create_sdk_client(opts, disable_continue=False): try: # Store client reference for interrupt support self._active_client = client - + if not self._first_run: - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "✅ Continuing conversation"} + message_id=msg_id, + delta="✅ Continuing conversation", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, ) logger.info("SDK continuing conversation from local state") @@ -927,30 +1017,30 @@ async def _prepare_workspace(self) -> AsyncIterator[BaseEvent]: try: if not workspace_has_git: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": "📥 Cloning input repository..."} - ) + async for event in self._emit_developer_message( + "📥 Cloning input repository...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event clone_url = self._url_with_token(input_repo, token) if token else input_repo await self._run_cmd(["git", "clone", "--branch", input_branch, "--single-branch", clone_url, str(workspace)], cwd=str(workspace.parent)) await self._run_cmd(["git", "remote", "set-url", "origin", clone_url], cwd=str(workspace), ignore_errors=True) elif reusing_workspace: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": "✓ Preserving workspace (continuation)"} - ) + async for event in self._emit_developer_message( + "✓ Preserving workspace (continuation)", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(input_repo, token) if token else input_repo], cwd=str(workspace), ignore_errors=True) else: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": "🔄 Resetting workspace to clean state"} - ) + async for event in self._emit_developer_message( + "🔄 Resetting workspace to clean state", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(input_repo, token) if token else input_repo], cwd=str(workspace)) await self._run_cmd(["git", "fetch", "origin", input_branch], cwd=str(workspace)) await self._run_cmd(["git", "checkout", input_branch], cwd=str(workspace)) @@ -969,12 +1059,12 @@ async def _prepare_workspace(self) -> AsyncIterator[BaseEvent]: except Exception as e: logger.error(f"Failed to prepare workspace: {e}") - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"Workspace preparation failed: {e}"} - ) + async for event in self._emit_developer_message( + f"Workspace preparation failed: {e}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event # Create artifacts directory try: @@ -1001,30 +1091,30 @@ async def _prepare_multi_repo_workspace( repo_exists = repo_dir.exists() and (repo_dir / ".git").exists() if not repo_exists: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"📥 Cloning {name}..."} - ) + async for event in self._emit_developer_message( + f"📥 Cloning {name}...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event clone_url = self._url_with_token(url, token) if token else url await self._run_cmd(["git", "clone", "--branch", branch, "--single-branch", clone_url, str(repo_dir)], cwd=str(workspace)) await self._run_cmd(["git", "remote", "set-url", "origin", clone_url], cwd=str(repo_dir), ignore_errors=True) elif reusing_workspace: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✓ Preserving {name} (continuation)"} - ) + async for event in self._emit_developer_message( + f"✓ Preserving {name} (continuation)", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(url, token) if token else url], cwd=str(repo_dir), ignore_errors=True) else: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"🔄 Resetting {name} to clean state"} - ) + async for event in self._emit_developer_message( + f"🔄 Resetting {name} to clean state", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(url, token) if token else url], cwd=str(repo_dir), ignore_errors=True) await self._run_cmd(["git", "fetch", "origin", branch], cwd=str(repo_dir)) await self._run_cmd(["git", "checkout", branch], cwd=str(repo_dir)) @@ -1046,12 +1136,12 @@ async def _prepare_multi_repo_workspace( except Exception as e: logger.error(f"Failed to prepare multi-repo workspace: {e}") - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"Workspace preparation failed: {e}"} - ) + async for event in self._emit_developer_message( + f"Workspace preparation failed: {e}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event async def _validate_prerequisites(self): """Validate prerequisite files exist for phase-based slash commands.""" @@ -1130,22 +1220,22 @@ async def _clone_workflow_repository( temp_clone_dir = workspace / "workflows" / f"{workflow_name}-clone-temp" if workflow_dir.exists(): - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✓ Workflow {workflow_name} already loaded"} - ) + async for event in self._emit_developer_message( + f"✓ Workflow {workflow_name} already loaded", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event return token = await self._fetch_token_for_url(git_url) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"📥 Cloning workflow {workflow_name}..."} - ) + async for event in self._emit_developer_message( + f"📥 Cloning workflow {workflow_name}...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event clone_url = self._url_with_token(git_url, token) if token else git_url await self._run_cmd(["git", "clone", "--branch", branch, "--single-branch", clone_url, str(temp_clone_dir)], cwd=str(workspace)) @@ -1155,29 +1245,29 @@ async def _clone_workflow_repository( if subdir_path.exists() and subdir_path.is_dir(): shutil.copytree(subdir_path, workflow_dir) shutil.rmtree(temp_clone_dir) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✓ Extracted workflow from: {path}"} - ) + async for event in self._emit_developer_message( + f"✓ Extracted workflow from: {path}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event else: temp_clone_dir.rename(workflow_dir) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"⚠️ Path '{path}' not found, using full repository"} - ) + async for event in self._emit_developer_message( + f"⚠️ Path '{path}' not found, using full repository", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event else: temp_clone_dir.rename(workflow_dir) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✅ Workflow {workflow_name} ready"} - ) + async for event in self._emit_developer_message( + f"✅ Workflow {workflow_name} ready", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event async def _run_cmd(self, cmd, cwd=None, capture_stdout=False, ignore_errors=False): """Run a subprocess command asynchronously."""