diff --git a/components/frontend/src/components/session/MessagesTab.tsx b/components/frontend/src/components/session/MessagesTab.tsx index 3e45d60b6..4e8d7bf98 100644 --- a/components/frontend/src/components/session/MessagesTab.tsx +++ b/components/frontend/src/components/session/MessagesTab.tsx @@ -73,13 +73,17 @@ const MessagesTab: React.FC = ({ session, streamMessages, chat // Filter out system messages unless showSystemMessages is true const filteredMessages = streamMessages.filter((msg) => { if (showSystemMessages) return true; - - // Hide system_message type by default - // Check if msg has a type property and if it's a system_message + + // Hide system_message type by default (legacy) if ('type' in msg && msg.type === "system_message") { return false; } - + + // Hide messages with system or developer role (AG-UI protocol) + if ('role' in msg && (msg.role === "system" || msg.role === "developer")) { + return false; + } + return true; }); diff --git a/components/frontend/src/components/ui/message.tsx b/components/frontend/src/components/ui/message.tsx index 398858e87..67cb93507 100644 --- a/components/frontend/src/components/ui/message.tsx +++ b/components/frontend/src/components/ui/message.tsx @@ -7,7 +7,7 @@ import remarkGfm from "remark-gfm"; import type { Components } from "react-markdown"; import { formatTimestamp } from "@/lib/format-timestamp"; -export type MessageRole = "bot" | "user"; +export type MessageRole = "bot" | "user" | "system"; export type MessageProps = { role: MessageRole; @@ -176,8 +176,9 @@ export const Message = React.forwardRef( ref ) => { const isBot = role === "bot"; - const avatarBg = isBot ? "bg-blue-600" : "bg-green-600"; - const avatarText = isBot ? "AI" : "U"; + const isSystem = role === "system"; + const avatarBg = isBot ? "bg-blue-600" : isSystem ? "bg-gray-500" : "bg-green-600"; + const avatarText = isBot ? "AI" : isSystem ? "SYS" : "U"; const formattedTime = formatTimestamp(timestamp); const isActivelyStreaming = streaming && isBot; @@ -198,25 +199,25 @@ export const Message = React.forwardRef( ) return ( -
-
+
+
{/* Avatar */} - {isBot ? avatar : null} + {(isBot || isSystem) ? avatar : null} {/* Message Content */} -
+
{/* Timestamp */} {formattedTime && ( -
+
{formattedTime}
)}
{/* Content */} -
+
{isLoading ? (
{content}
@@ -243,7 +244,7 @@ export const Message = React.forwardRef(
- {isBot ? null : avatar} + {(isBot || isSystem) ? null : avatar}
); diff --git a/components/frontend/src/components/ui/stream-message.tsx b/components/frontend/src/components/ui/stream-message.tsx index 77cbab175..026dcd24d 100644 --- a/components/frontend/src/components/ui/stream-message.tsx +++ b/components/frontend/src/components/ui/stream-message.tsx @@ -63,14 +63,36 @@ export const StreamMessage: React.FC = ({ message, onGoToRes case "user_message": case "agent_message": { const isStreaming = 'streaming' in message && message.streaming; + + // Check for AG-UI role field + const role = 'role' in m ? m.role : undefined; + + // Determine display role based on AG-UI role + let displayRole: "user" | "bot" | "system" = "user"; + let displayName = "You"; + + if (role === "assistant" || (m.type === "agent_message" && !role)) { + displayRole = "bot"; + displayName = "Claude AI"; + } else if (role === "developer") { + displayRole = "system"; + displayName = "Platform"; + } else if (role === "system") { + displayRole = "system"; + displayName = "System"; + } else if (role === "user" || m.type === "user_message") { + displayRole = "user"; + displayName = "You"; + } + if (typeof m.content === "string") { - return ; + return ; } switch (m.content.type) { case "thinking_block": return case "text_block": - return + return case "tool_use_block": return case "tool_result_block": diff --git a/components/frontend/src/types/agentic-session.ts b/components/frontend/src/types/agentic-session.ts index 4deb326cb..8544307d6 100644 --- a/components/frontend/src/types/agentic-session.ts +++ b/components/frontend/src/types/agentic-session.ts @@ -102,7 +102,7 @@ export type HierarchicalToolMessage = ToolUseMessages & { // ----------------------------- // Message Types // ----------------------------- -export type Message = UserMessage | AgentMessage | SystemMessage | ResultMessage | ToolUseMessages | AgentRunningMessage | AgentWaitingMessage; +export type Message = UserMessage | AgentMessage | SystemMessage | ResultMessage | ToolUseMessages | AgentRunningMessage | AgentWaitingMessage | DeveloperMessage | SystemRoleMessage; export type AgentRunningMessage = { type: "agent_running"; @@ -117,19 +117,35 @@ export type UserMessage = { type: "user_message"; content: ContentBlock | string; timestamp: string; + role?: "user"; // AG-UI role field } export type AgentMessage = { type: "agent_message"; content: ContentBlock; model: string; timestamp: string; + role?: "assistant"; // AG-UI role field } +// Legacy system_message type (for compatibility) export type SystemMessage = { type: "system_message"; subtype: string; data: Record; timestamp: string; } +// New AG-UI role-based messages +export type DeveloperMessage = { + type: "user_message" | "agent_message"; + content: ContentBlock | string; + timestamp: string; + role: "developer"; // Platform/internal logging +} +export type SystemRoleMessage = { + type: "user_message" | "agent_message"; + content: ContentBlock | string; + timestamp: string; + role: "system"; // Claude system messages (turn info, etc) +} export type ResultMessage = { type: "result_message"; subtype: string; diff --git a/components/runners/claude-code-runner/adapter.py b/components/runners/claude-code-runner/adapter.py index 419e493d2..3f450993d 100644 --- a/components/runners/claude-code-runner/adapter.py +++ b/components/runners/claude-code-runner/adapter.py @@ -107,6 +107,32 @@ def _timestamp(self) -> str: """Return current UTC timestamp in ISO format.""" return datetime.now(timezone.utc).isoformat() + async def _emit_developer_message( + self, message: str, thread_id: str, run_id: str + ) -> AsyncIterator[BaseEvent]: + """Helper to emit a developer role message (platform logging).""" + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta=message, + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + ) + async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEvent]: """ Process a run and yield AG-UI events. @@ -199,11 +225,27 @@ async def process_run(self, input_data: RunAgentInput) -> AsyncIterator[BaseEven if not user_message: logger.warning("No user message found in input") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="No user message provided", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "No user message provided"} + message_id=msg_id, ) yield RunFinishedEvent( type=EventType.RUN_FINISHED, @@ -431,11 +473,27 @@ async def _run_claude_agent_sdk( try: options.continue_conversation = True logger.info("Enabled continue_conversation for session resumption") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "🔄 Continuing conversation from previous state"} + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="🔄 Continuing conversation from previous state", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, ) except Exception as e: logger.warning(f"Failed to set continue_conversation: {e}") @@ -492,11 +550,27 @@ def create_sdk_client(opts, disable_continue=False): error_str = str(resume_error).lower() if "no conversation found" in error_str or "session" in error_str: logger.warning(f"Conversation continuation failed: {resume_error}") - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + delta="⚠️ Could not continue conversation, starting fresh...", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "⚠️ Could not continue conversation, starting fresh..."} + message_id=msg_id, ) client = create_sdk_client(options, disable_continue=True) await client.connect() @@ -506,13 +580,29 @@ def create_sdk_client(opts, disable_continue=False): try: # Store client reference for interrupt support self._active_client = client - + if not self._first_run: - yield RawEvent( - type=EventType.RAW, + # Emit as developer message (platform logging) + msg_id = str(uuid.uuid4()) + yield TextMessageStartEvent( + type=EventType.TEXT_MESSAGE_START, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, + role="developer", + ) + yield TextMessageContentEvent( + type=EventType.TEXT_MESSAGE_CONTENT, thread_id=thread_id, run_id=run_id, - event={"type": "system_log", "message": "✅ Continuing conversation"} + message_id=msg_id, + delta="✅ Continuing conversation", + ) + yield TextMessageEndEvent( + type=EventType.TEXT_MESSAGE_END, + thread_id=thread_id, + run_id=run_id, + message_id=msg_id, ) logger.info("SDK continuing conversation from local state") @@ -927,30 +1017,30 @@ async def _prepare_workspace(self) -> AsyncIterator[BaseEvent]: try: if not workspace_has_git: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": "📥 Cloning input repository..."} - ) + async for event in self._emit_developer_message( + "📥 Cloning input repository...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event clone_url = self._url_with_token(input_repo, token) if token else input_repo await self._run_cmd(["git", "clone", "--branch", input_branch, "--single-branch", clone_url, str(workspace)], cwd=str(workspace.parent)) await self._run_cmd(["git", "remote", "set-url", "origin", clone_url], cwd=str(workspace), ignore_errors=True) elif reusing_workspace: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": "✓ Preserving workspace (continuation)"} - ) + async for event in self._emit_developer_message( + "✓ Preserving workspace (continuation)", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(input_repo, token) if token else input_repo], cwd=str(workspace), ignore_errors=True) else: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": "🔄 Resetting workspace to clean state"} - ) + async for event in self._emit_developer_message( + "🔄 Resetting workspace to clean state", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(input_repo, token) if token else input_repo], cwd=str(workspace)) await self._run_cmd(["git", "fetch", "origin", input_branch], cwd=str(workspace)) await self._run_cmd(["git", "checkout", input_branch], cwd=str(workspace)) @@ -969,12 +1059,12 @@ async def _prepare_workspace(self) -> AsyncIterator[BaseEvent]: except Exception as e: logger.error(f"Failed to prepare workspace: {e}") - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"Workspace preparation failed: {e}"} - ) + async for event in self._emit_developer_message( + f"Workspace preparation failed: {e}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event # Create artifacts directory try: @@ -1001,30 +1091,30 @@ async def _prepare_multi_repo_workspace( repo_exists = repo_dir.exists() and (repo_dir / ".git").exists() if not repo_exists: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"📥 Cloning {name}..."} - ) + async for event in self._emit_developer_message( + f"📥 Cloning {name}...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event clone_url = self._url_with_token(url, token) if token else url await self._run_cmd(["git", "clone", "--branch", branch, "--single-branch", clone_url, str(repo_dir)], cwd=str(workspace)) await self._run_cmd(["git", "remote", "set-url", "origin", clone_url], cwd=str(repo_dir), ignore_errors=True) elif reusing_workspace: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✓ Preserving {name} (continuation)"} - ) + async for event in self._emit_developer_message( + f"✓ Preserving {name} (continuation)", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(url, token) if token else url], cwd=str(repo_dir), ignore_errors=True) else: - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"🔄 Resetting {name} to clean state"} - ) + async for event in self._emit_developer_message( + f"🔄 Resetting {name} to clean state", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event await self._run_cmd(["git", "remote", "set-url", "origin", self._url_with_token(url, token) if token else url], cwd=str(repo_dir), ignore_errors=True) await self._run_cmd(["git", "fetch", "origin", branch], cwd=str(repo_dir)) await self._run_cmd(["git", "checkout", branch], cwd=str(repo_dir)) @@ -1046,12 +1136,12 @@ async def _prepare_multi_repo_workspace( except Exception as e: logger.error(f"Failed to prepare multi-repo workspace: {e}") - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"Workspace preparation failed: {e}"} - ) + async for event in self._emit_developer_message( + f"Workspace preparation failed: {e}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event async def _validate_prerequisites(self): """Validate prerequisite files exist for phase-based slash commands.""" @@ -1130,22 +1220,22 @@ async def _clone_workflow_repository( temp_clone_dir = workspace / "workflows" / f"{workflow_name}-clone-temp" if workflow_dir.exists(): - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✓ Workflow {workflow_name} already loaded"} - ) + async for event in self._emit_developer_message( + f"✓ Workflow {workflow_name} already loaded", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event return token = await self._fetch_token_for_url(git_url) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"📥 Cloning workflow {workflow_name}..."} - ) + async for event in self._emit_developer_message( + f"📥 Cloning workflow {workflow_name}...", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event clone_url = self._url_with_token(git_url, token) if token else git_url await self._run_cmd(["git", "clone", "--branch", branch, "--single-branch", clone_url, str(temp_clone_dir)], cwd=str(workspace)) @@ -1155,29 +1245,29 @@ async def _clone_workflow_repository( if subdir_path.exists() and subdir_path.is_dir(): shutil.copytree(subdir_path, workflow_dir) shutil.rmtree(temp_clone_dir) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✓ Extracted workflow from: {path}"} - ) + async for event in self._emit_developer_message( + f"✓ Extracted workflow from: {path}", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event else: temp_clone_dir.rename(workflow_dir) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"⚠️ Path '{path}' not found, using full repository"} - ) + async for event in self._emit_developer_message( + f"⚠️ Path '{path}' not found, using full repository", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event else: temp_clone_dir.rename(workflow_dir) - yield RawEvent( - type=EventType.RAW, - thread_id=self._current_thread_id or self.context.session_id, - run_id=self._current_run_id or "init", - event={"type": "system_log", "message": f"✅ Workflow {workflow_name} ready"} - ) + async for event in self._emit_developer_message( + f"✅ Workflow {workflow_name} ready", + self._current_thread_id or self.context.session_id, + self._current_run_id or "init" + ): + yield event async def _run_cmd(self, cmd, cwd=None, capture_stdout=False, ignore_errors=False): """Run a subprocess command asynchronously."""