diff --git a/backend/agents/database.py b/backend/agents/database.py new file mode 100644 index 0000000..2e49f3b --- /dev/null +++ b/backend/agents/database.py @@ -0,0 +1,113 @@ +from typing import Dict, Any, Optional +from supabase import create_client +from dotenv import load_dotenv +import os + +load_dotenv() +SUPABASE_URL = os.getenv('SUPABASE_URL') +SUPABASE_KEY = os.getenv('SUPABASE_KEY') +supabase = create_client(SUPABASE_URL, SUPABASE_KEY) if SUPABASE_URL and SUPABASE_KEY else None + + +def _has_client() -> bool: + return supabase is not None + + +async def create_result(pr_name: str, pr_link: str, overall_result: Dict[str, Any], run_status: str) -> Optional[int]: + """Insert a new row into results and return its id.""" + try: + if not _has_client(): + print("[db] Skipping create_result: SUPABASE not configured") + return None + payload = { + "pr_name": pr_name, + "pr_link": pr_link, + "overall_result": overall_result, + "run_status": run_status, + } + resp = supabase.table('results').insert(payload).execute() + row = (resp.data or [{}])[0] + print(f"[db] Created result id={row.get('id')} status={run_status}") + return row.get('id') + except Exception as e: + print(f"[db] ❌ create_result error: {str(e)}") + return None + + +async def set_suite_result_id(suite_id: int, result_id: int) -> None: + """Link a suite to a result by setting suites.result_id.""" + try: + if not _has_client(): + print(f"[db] Skipping set_suite_result_id for suite {suite_id}: no client") + return + supabase.table('suites').update({"result_id": result_id}).eq('id', suite_id).execute() + print(f"[db] Linked suite {suite_id} -> result {result_id}") + except Exception as e: + print(f"[db] ❌ set_suite_result_id error: {str(e)}") + + +async def get_or_create_test(suite_id: int, name: str) -> Optional[int]: + """Find a test row by (suite_id, name) or create it. Returns test id.""" + try: + if not _has_client(): + print(f"[db] Skipping get_or_create_test for suite {suite_id}, name '{name}': no client") + return None + # Try find existing + resp = supabase.table('tests').select('id').eq('suite_id', suite_id).eq('name', name).limit(1).execute() + if resp.data: + return resp.data[0]['id'] + # Create new + payload = { + "suite_id": suite_id, + "name": name, + "steps": [], + "run_status": "RUNNING", + } + ins = supabase.table('tests').insert(payload).execute() + row = (ins.data or [{}])[0] + print(f"[db] Created test id={row.get('id')} for suite {suite_id}, name '{name}'") + return row.get('id') + except Exception as e: + print(f"[db] ❌ get_or_create_test error: {str(e)}") + return None + + +async def append_test_step(test_id: int, step: Any) -> None: + """Append a single step to tests.steps immediately (read-modify-write).""" + try: + if not _has_client(): + return + res = supabase.table('tests').select('steps').eq('id', test_id).limit(1).execute() + steps = [] + if res.data: + curr = res.data[0].get('steps') + if isinstance(curr, list): + steps = curr + steps.append(step) + supabase.table('tests').update({"steps": steps}).eq('id', test_id).execute() + except Exception as e: + print(f"[db] ❌ append_test_step error: {str(e)}") + + +async def update_test_fields(test_id: int, fields: Dict[str, Any]) -> None: + """Generic update of tests row.""" + try: + if not _has_client(): + return + supabase.table('tests').update(fields).eq('id', test_id).execute() + except Exception as e: + print(f"[db] ❌ update_test_fields error: {str(e)}") + + +async def get_result_id_for_suite(suite_id: int) -> Optional[int]: + """Return result_id for a given suite_id from suites table.""" + try: + if not _has_client(): + return None + resp = supabase.table('suites').select('result_id').eq('id', suite_id).limit(1).execute() + if resp.data: + return resp.data[0].get('result_id') + return None + except Exception as e: + print(f"[db] ❌ get_result_id_for_suite error: {str(e)}") + return None diff --git a/backend/agents/prompts.py b/backend/agents/prompts.py new file mode 100644 index 0000000..44b54f2 --- /dev/null +++ b/backend/agents/prompts.py @@ -0,0 +1,63 @@ +from typing import List, Dict +import os + +from dotenv import load_dotenv +load_dotenv() + +def build_agent_instructions(tests: List[Dict], suite: Dict) -> str: + """Build optimized instructions for autonomous agent testing""" + + base_url = os.getenv('DEPLOYMENT_URL', 'https://staging.example.com') + + # Create comprehensive testing instructions + instructions = f""" +You are an autonomous QA testing agent for web applications. Your goal is to thoroughly test the deployment at {base_url}. + +SUITE: {suite['name']} +TOTAL TESTS: {len(tests)} + +TESTING APPROACH: +1. Start by taking a screenshot to see the current state +2. Navigate to the base URL: {base_url} +3. Perform comprehensive exploratory testing +4. Look for bugs, broken functionality, and edge cases +5. Test user flows and interactions +6. Pay special attention to recent changes that might have introduced issues + +SPECIFIC TEST SCENARIOS: +""" + + for i, test in enumerate(tests, 1): + instructions += f""" +{i}. {test['name']} + Description: {test.get('summary', 'No description provided')} + Priority: {'HIGH' if 'critical' in test.get('summary', '').lower() else 'MEDIUM'} +""" + + instructions += """ + +TESTING GUIDELINES: +- Be thorough and methodical in your approach +- Take screenshots at key moments to document your findings +- Test both happy paths and edge cases +- Look for unexpected behaviors, errors, or broken functionality +- Pay attention to UI/UX issues and usability problems +- Test form submissions, navigation, and interactive elements +- Check for responsive design and mobile compatibility if applicable +- Document any bugs or issues you discover with clear descriptions + +SUCCESS CRITERIA: +- Complete testing of all specified scenarios +- Identify and document any bugs or issues found +- Verify that core functionality works as expected +- Provide clear feedback on the overall quality of the deployment + +Remember: You are looking for unexpected bugs and issues that developers might miss. Be creative in your testing approach and explore edge cases. + +FINAL VERDICT FORMAT (MANDATORY): +- After completing each test scenario, output exactly one line with no extra commentary: +- RESULT: PASSED (if the scenario executed successfully and no critical issues were found) +- RESULT: FAILED (if execution could not complete or a critical/blocking issue was found) +""" + + return instructions \ No newline at end of file diff --git a/backend/record_lib.py b/backend/agents/record.py similarity index 95% rename from backend/record_lib.py rename to backend/agents/record.py index f6aaba4..e385b67 100644 --- a/backend/record_lib.py +++ b/backend/agents/record.py @@ -12,14 +12,6 @@ - State is stored at /tmp/cua_recorder/state.json """ -import os -import json -import time -import signal -import platform -import subprocess -from pathlib import Path - def start_recording(output_dir=None, fps=None, width=None, height=None, display=None): """Start ffmpeg screen recording in background and persist PID. @@ -115,6 +107,8 @@ def stop_recording(upload_url=None): import time as _time import signal as _signal from pathlib import Path as _Path + import urllib.request as _urlreq + import uuid as _uuid state_path = _Path("/tmp/cua_recorder/state.json") data = {} @@ -153,14 +147,13 @@ def stop_recording(upload_url=None): # Try upload to server upload = {"ok": False} try: - import urllib.request as _urlreq - import uuid as _uuid - import os as _os + VIDEO_UPLOAD_URL = "https://qai-ashy.vercel.app/upload-video" # Determine upload URL: param overrides env, skip if none _url = upload_url if not _url: - _url = _os.getenv("VIDEO_UPLOAD_URL") + _url = VIDEO_UPLOAD_URL + print(f"Upload URL: {_url}") if path and _os.path.exists(path) and _url: boundary = f"----WebKitFormBoundary{_uuid.uuid4().hex}" @@ -177,7 +170,7 @@ def stop_recording(upload_url=None): body_prefix = ("".join(parts)).encode("utf-8") body_suffix = (CRLF + f"--{boundary}--{CRLF}").encode("utf-8") body = body_prefix + file_bytes + body_suffix - + print(f"Body: {body}") req = _urlreq.Request( url=_url, data=body, @@ -196,9 +189,10 @@ def stop_recording(upload_url=None): upload = {"ok": True, "response": upload_json} except Exception as _e: upload = {"ok": False, "error": repr(_e)} - + print(f"Upload: {upload}") return {"ok": True, "path": path, "upload": upload} except Exception as e: + print(f"Error: {e}") return {"ok": False, "error": repr(e)} diff --git a/backend/agents/run_suite.py b/backend/agents/run_suite.py index e62012d..aba570e 100644 --- a/backend/agents/run_suite.py +++ b/backend/agents/run_suite.py @@ -5,7 +5,6 @@ """ import sys import asyncio -import os from pathlib import Path # Add the agents directory to the Python path diff --git a/backend/agents/runner.py b/backend/agents/runner.py index 1577a71..3f83c51 100644 --- a/backend/agents/runner.py +++ b/backend/agents/runner.py @@ -3,432 +3,216 @@ import os import json import asyncio -from pathlib import Path -from typing import Any, Dict, List, Optional -try: - from supabase import create_client -except ImportError: - print("⚠️ supabase-py not installed. Install with: pip install supabase") - create_client = None +from typing import Any, Dict, List from dotenv import load_dotenv - -from .utils import slugify, utc_now_iso, short_id, build_system_instructions, normalize_tests, make_remote_recording_dir - -def build_autonomous_test_instructions(tests: List[Dict], suite: Dict) -> str: - """Build optimized instructions for autonomous agent testing""" - - base_url = os.getenv('DEPLOYMENT_URL', 'https://staging.example.com') - - # Create comprehensive testing instructions - instructions = f""" -You are an autonomous QA testing agent for web applications. Your goal is to thoroughly test the deployment at {base_url}. - -SUITE: {suite['name']} -TOTAL TESTS: {len(tests)} - -TESTING APPROACH: -1. Start by taking a screenshot to see the current state -2. Navigate to the base URL: {base_url} -3. Perform comprehensive exploratory testing -4. Look for bugs, broken functionality, and edge cases -5. Test user flows and interactions -6. Pay special attention to recent changes that might have introduced issues - -SPECIFIC TEST SCENARIOS: -""" - - for i, test in enumerate(tests, 1): - instructions += f""" -{i}. {test['name']} - Description: {test.get('summary', 'No description provided')} - Priority: {'HIGH' if 'critical' in test.get('summary', '').lower() else 'MEDIUM'} -""" - - instructions += f""" - -TESTING GUIDELINES: -- Be thorough and methodical in your approach -- Take screenshots at key moments to document your findings -- Test both happy paths and edge cases -- Look for unexpected behaviors, errors, or broken functionality -- Pay attention to UI/UX issues and usability problems -- Test form submissions, navigation, and interactive elements -- Check for responsive design and mobile compatibility if applicable -- Document any bugs or issues you discover with clear descriptions - -SUCCESS CRITERIA: -- Complete testing of all specified scenarios -- Identify and document any bugs or issues found -- Verify that core functionality works as expected -- Provide clear feedback on the overall quality of the deployment - -Remember: You are looking for unexpected bugs and issues that developers might miss. Be creative in your testing approach and explore edge cases. -""" - - return instructions +from enum import Enum + +from .database import ( + get_or_create_test, + append_test_step, + update_test_fields, + create_result, + set_suite_result_id, +) +from .prompts import build_agent_instructions +from .utils import normalize_tests, make_remote_recording_dir, process_item +from .record import start_recording, stop_recording + +class RunStatus(Enum): + QUEUED = "QUEUED" + RUNNING = "RUNNING" + PASSED = "PASSED" + FAILED = "FAILED" # Load environment variables load_dotenv() -# TODO: Replace with DB storage in the future -ARTIFACTS_ROOT = Path(__file__).parent.parent / "artifacts" - -# Database client setup -SUPABASE_URL = os.getenv('SUPABASE_URL') -SUPABASE_KEY = os.getenv('SUPABASE_KEY') -supabase = create_client(SUPABASE_URL, SUPABASE_KEY) if create_client and SUPABASE_URL and SUPABASE_KEY else None - - -async def run_single_agent(spec: Dict[str, Any], run_dir: Path, suite_id: Optional[int] = None) -> Dict[str, Any]: - persona = spec.get("persona") or "agent" - persona_slug = slugify(str(persona)) - # TODO: dynamically route to the appropriate model - model = spec.get("model") or os.getenv("CUA_MODEL", "anthropic/claude-3-5-sonnet-20241022") - budget = spec.get("budget", 5.0) - max_duration_sec: Optional[float] = spec.get("max_duration_sec") - os_type = "linux" - provider_type = "cloud" - container_name = spec.get("container_name") or os.getenv("CUA_CONTAINER_NAME") - api_key = os.getenv("CUA_API_KEY") - if not api_key: - raise RuntimeError("CUA_API_KEY is required") - if not container_name: - raise RuntimeError("CUA_CONTAINER_NAME is required") - - # Tests: use explicit tests list or fallback to single test from top-level instructions/messages - tests = normalize_tests(spec) - - # TODO: Replace with DB storage in the future - agent_dir = run_dir / persona_slug - agent_dir.mkdir(parents=True, exist_ok=True) - - async def _execute() -> Dict[str, Any]: - steps: List[Dict[str, Any]] = [] - texts: List[str] = [] - usage_snapshots: List[Dict[str, Any]] = [] - status = "success" - error: Optional[str] = None - - async with Computer( - os_type=os_type, - provider_type=provider_type, - name=container_name, - api_key=api_key - ) as computer: - callbacks = [] - - agent = ComputerAgent( - model=model, - tools=[computer], - trajectory_dir=str(agent_dir), - max_trajectory_budget=budget, - instructions=build_system_instructions(persona), - callbacks=callbacks, - ) - - # Open the browser before starting agent steps - try: - await computer.interface.left_click(536, 742) - print(f"[{persona_slug}] opened browser successfully") - except Exception: - print(f"[{persona_slug}] opened browser failed") - pass - - for test in tests: - test_name = test.get("name", "test") - test_messages = test.get("messages") or [] - - # Start recording inside VM - recording_info = None - try: - from record_lib import start_recording, stop_recording # type: ignore - except Exception: - start_recording = None - stop_recording = None - - if start_recording: - try: - remote_dir = make_remote_recording_dir(persona_slug, test_name) - recording_info = await computer.venv_exec("demo_venv", start_recording, output_dir=remote_dir, fps=5) - print(f"[{persona_slug}] recording started for {test_name}: {recording_info}") - except Exception as _e: - print(f"[{persona_slug}] recording start failed for {test_name}: {_e}") - - try: - async for result in agent.run(test_messages): - usage = result.get("usage") - if usage is not None: - try: - usage_snapshots.append(json.loads(json.dumps(usage, default=str))) - except Exception: - pass - - for item in result.get("output", []): - item_type = item.get("type") - if item_type == "message": - try: - content = item.get("content") or [] - for block in content: - if isinstance(block, dict) and block.get("text"): - print(f"[{persona_slug}] message: {block['text']}") - texts.append(block["text"]) - except Exception: - pass - elif item_type in ("computer_call", "computer_call_output", "function_call", "function_call_output"): - pruned = dict(item) - if pruned.get("type") == "computer_call_output": - output = pruned.get("output", {}) - if isinstance(output, dict) and "image_url" in output: - output = dict(output) - # Note: actual screenshots are saved under the trajectory dir - output["image_url"] = "[omitted]" - pruned["output"] = output - print(f"[{persona_slug}] computer_call_output: screenshot captured (image omitted)") - elif pruned.get("type") == "computer_call": - action = pruned.get("action", {}) or {} - a_type = action.get("type", "unknown") - a_args = {k: v for k, v in action.items() if k != "type"} - print(f"[{persona_slug}] computer_call: {a_type}({a_args})") - elif pruned.get("type") == "function_call": - fname = pruned.get("name", "") - print(f"[{persona_slug}] function_call: {fname}") - elif pruned.get("type") == "function_call_output": - print(f"[{persona_slug}] function_call_output: received") - steps.append(pruned) - except Exception as e: - status = "error" - error = repr(e) - finally: - if stop_recording: - try: - rec_stop = await computer.venv_exec("demo_venv", stop_recording) - print(f"[{persona_slug}] recording stopped for {test_name}: {rec_stop}") - except Exception as _e: - print(f"[{persona_slug}] recording stop failed for {test_name}: {_e}") - result = { - "persona": persona, - "persona_slug": persona_slug, - "model": model, - "container_name": container_name, - "trajectory_dir": str(agent_dir), - "status": status, - "error": error, - "texts": texts, - "steps": steps, - "usage": usage_snapshots, - "suite_id": suite_id, - } - - # Save results to database if available - if supabase and suite_id: - await save_agent_results_to_db(result, suite_id) - - return result - - if max_duration_sec and max_duration_sec > 0: - return await asyncio.wait_for(_execute(), timeout=float(max_duration_sec)) - return await _execute() - - -async def save_agent_results_to_db(agent_result: Dict[str, Any], suite_id: int): - """Save individual agent test results to database""" - try: - print(f"[{agent_result['persona_slug']}] Saving results to database for suite {suite_id}") - - # Update suite success status - suite_success = agent_result["status"] == "success" - - # If we have S3 integration, save trajectory/screenshots there and get S3 link - s3_link = None # TODO: Implement S3 upload for trajectory files +async def run_single_agent(spec: Dict[str, Any]) -> Dict[str, Any]: + # Setup CUA agent + model = spec.get("model") or os.getenv("CUA_MODEL", "anthropic/claude-3-5-sonnet-20241022") + budget = spec.get("budget", 5.0) + suite_id = spec.get("suite_id") + + # Setup CUA computer + os_type = "linux" + provider_type = "cloud" + container_name = spec.get("container_name") or os.getenv("CUA_CONTAINER_NAME") + api_key = os.getenv("CUA_API_KEY") + if not api_key: + raise RuntimeError("CUA_API_KEY is required") + if not container_name: + raise RuntimeError("CUA_CONTAINER_NAME is required") + + # Setup tests + tests = normalize_tests(spec) + + async def _execute() -> Dict[str, Any]: + # Results from all tests from the suite + suite_results: List[Dict[str, Any]] = [] - # Update the suite record with results - suite_update = { - 'suites-success': suite_success, - 's3-link': s3_link - } + def _prepare_step_for_storage(item: Dict[str, Any]): + t = item.get("type") + if t == "message": + try: + content = item.get("content") or [] + for block in content: + if isinstance(block, dict) and block.get("text"): + return block["text"] + except Exception: + return item + elif t in ("computer_call", "computer_call_output", "function_call", "function_call_output"): + pruned = dict(item) + if pruned.get("type") == "computer_call_output": + output = pruned.get("output", {}) + if isinstance(output, dict) and "image_url" in output: + output = dict(output) + output["image_url"] = "[omitted]" + pruned["output"] = output + return pruned + return item - response = supabase.table('suites').update(suite_update).eq('id', suite_id).execute() - - if response.data: - print(f"[{agent_result['persona_slug']}] ✅ Updated suite {suite_id} with success status: {suite_success}") - else: - print(f"[{agent_result['persona_slug']}] ❌ Failed to update suite {suite_id}") + async with Computer( + os_type=os_type, + provider_type=provider_type, + name=container_name, + api_key=api_key + ) as computer: - # Get all tests for this suite and update them with results - tests_response = supabase.table('tests').select('*').eq('suite_id', suite_id).execute() - - if tests_response.data: - print(f"[{agent_result['persona_slug']}] Found {len(tests_response.data)} tests to update") + agent = ComputerAgent( + model=model, + tools=[computer], + max_trajectory_budget=budget, + instructions=build_agent_instructions(tests, spec), + ) - # For now, mark all tests in the suite with the same status as the agent - # In future, could parse individual test results from agent steps - test_updates = [] - for test in tests_response.data: - test_update = { - 'id': test['id'], - 'test-success': suite_success, - 'summary': f"Agent {agent_result['persona']} completed with status: {agent_result['status']}" - } - if agent_result.get('error'): - test_update['summary'] += f" Error: {agent_result['error']}" - test_updates.append(test_update) + # Open the browser before starting agent steps + try: + await computer.interface.left_click(536, 742) + print(f"[Agent {suite_id}] opened browser successfully") + except Exception: + print(f"[Agent{suite_id}] opened browser failed") + pass + + for test in tests: + test_name = test.get("name", "test") + test_instructions = test.get("instructions") or [] - # Update all tests in batch - for test_update in test_updates: - test_response = supabase.table('tests').update({ - 'test-success': test_update['test-success'], - 'summary': test_update['summary'] - }).eq('id', test_update['id']).execute() + # Per-test accumulators + test_agent_steps: List[Dict[str, Any]] = [] + test_run_status = RunStatus.RUNNING - print(f"[{agent_result['persona_slug']}] Updated test {test_update['id']} - success: {test_update['test-success']}") + # Ensure DB row exists for this test + test_id = await get_or_create_test(suite_id, test_name) if suite_id is not None else None - except Exception as e: - print(f"[{agent_result['persona_slug']}] ❌ Database save error: {str(e)}") - - -async def load_suite_from_db(suite_id: int) -> Dict[str, Any]: - """Load suite and associated tests from database""" - try: - print(f"Loading suite {suite_id} from database...") - - # Get suite details - suite_response = supabase.table('suites').select('*').eq('id', suite_id).single().execute() - - if not suite_response.data: - raise RuntimeError(f"Suite {suite_id} not found") - - suite = suite_response.data - print(f"✅ Loaded suite: {suite['name']}") - - # Get associated tests - tests_response = supabase.table('tests').select('*').eq('suite_id', suite_id).execute() - - tests = tests_response.data or [] - print(f"✅ Loaded {len(tests)} tests for suite {suite_id}") - - return { - 'suite': suite, - 'tests': tests - } + # Start recording inside VM + try: + remote_dir = make_remote_recording_dir(suite_id, test_name) + await computer.venv_exec("demo_venv", start_recording, output_dir=remote_dir, fps=5) + print(f"[Agent {suite_id}] recording started for {test_name}") + except Exception as _e: + print(f"[Agent {suite_id}] recording start failed for {test_name}: {_e}") + + try: + async for result in agent.run(test_instructions): + for item in result.get("output", []): + # Add agent's current step + test_agent_steps = process_item(item, suite_id, test_agent_steps) + # Persist step immediately to DB + if test_id is not None: + step_payload = _prepare_step_for_storage(item) + await append_test_step(test_id, step_payload) + # Parse explicit verdict from agent message content + try: + if isinstance(item, dict) and item.get("type") == "message": + content = item.get("content") or [] + for block in content: + text = block.get("text") if isinstance(block, dict) else None + if isinstance(text, str): + cleaned = text.strip().upper() + if cleaned.endswith("RESULT: PASSED") or cleaned == "RESULT: PASSED": + test_run_status = RunStatus.PASSED + elif cleaned.endswith("RESULT: FAILED") or cleaned == "RESULT: FAILED": + test_run_status = RunStatus.FAILED + except Exception: + pass + except Exception as e: + test_run_status = RunStatus.FAILED + print(f"[Agent {suite_id}] test {test_name} failed: {e}") + finally: + # Determine pass/fail + passed = test_run_status == RunStatus.PASSED + s3_link = None + + # Stop recording and get S3 URL + try: + recording_stop = await computer.venv_exec("demo_venv", stop_recording) + if isinstance(recording_stop, dict): + upload = recording_stop.get("upload") or {} + resp = upload.get("response") or {} + s3_link = resp.get("fileUrl") or resp.get("url") + print(f"[Agent {suite_id}] recording stopped for {test_name}") + except Exception as e: + print(f"[Agent {suite_id}] stop_recording error for {test_name}: {e}") + pass + + # Persist final test fields + if test_id is not None: + await update_test_fields(test_id, { + "test_success": passed, + "s3_link": s3_link, + "run_status": test_run_status.value, + }) + + # Add test result to suite results + suite_results.append({ + "suite_id": suite_id, + "name": test_name, + "test_success": passed, + "steps": test_agent_steps, + "s3_link": s3_link, + "run_status": test_run_status, + }) - except Exception as e: - print(f"❌ Failed to load suite {suite_id}: {str(e)}") - raise - - -async def run_agents_from_db(suite_id: int) -> Dict[str, Any]: - """Run agents using suite and tests loaded from database""" - if not supabase: - raise RuntimeError("Database connection not available. Check SUPABASE_URL and SUPABASE_KEY environment variables.") - - # Load suite and tests from database - suite_data = await load_suite_from_db(suite_id) - suite = suite_data['suite'] - tests = suite_data['tests'] - - print(f"🚀 Running agents for suite: {suite['name']}") + return suite_results - # Convert database tests to agent specs format - # Create optimized instructions for autonomous testing - persona = suite['name'].replace(' Agent Suite', '').lower() - - # Build comprehensive test instructions - test_instructions = build_autonomous_test_instructions(tests, suite) - - test_spec = { - "persona": persona, - "instructions": test_instructions, - "model": os.getenv("CUA_MODEL", "anthropic/claude-3-5-sonnet-20241022"), - "budget": 10.0, # Increased budget for comprehensive testing - "max_duration_sec": float(os.getenv("AGENT_TIMEOUT", "600")) if os.getenv("AGENT_TIMEOUT") else None, # Increased timeout - "container_name": os.getenv("CUA_CONTAINER_NAME") - } - - run_id = short_id() - started_at = utc_now_iso() - artifacts_dir = ARTIFACTS_ROOT - artifacts_dir.mkdir(parents=True, exist_ok=True) - - print(f"📋 Test spec created for persona: {test_spec['persona']}") - print(f"📋 Instructions: {test_spec['instructions']}") - - # Run the single agent with the suite - agent_result = await run_single_agent(test_spec, artifacts_dir, suite_id) - - summary = { - "run_id": run_id, - "started_at": started_at, - "finished_at": utc_now_iso(), - "artifacts_root": str(artifacts_dir), - "suite_id": suite_id, - "suite_name": suite['name'], - "num_tests": len(tests), - "agent_result": agent_result, - } - - # Save summary locally (keep existing behavior) - output_path = os.getenv("CUA_OUTPUT_PATH") or str(artifacts_dir / f"summary_suite_{suite_id}.json") - try: - with open(output_path, "w", encoding="utf-8") as f: - json.dump(summary, f, indent=2) - print(f"💾 Summary saved to {output_path}") - except Exception as e: - print(f"❌ Failed to save summary: {str(e)}") - pass - - print(json.dumps(summary, indent=2)) - return summary - + return await _execute() -async def run_agents(test_specs: List[Dict[str, Any]], suite_id: Optional[int] = None) -> Dict[str, Any]: - run_id = short_id() - started_at = utc_now_iso() - artifacts_dir = ARTIFACTS_ROOT - artifacts_dir.mkdir(parents=True, exist_ok=True) - - tasks = [run_single_agent(spec, artifacts_dir, suite_id) for spec in test_specs] +async def run_agents(test_specs: List[Dict[str, Any]], pr_name: str, pr_link: str) -> Dict[str, Any]: + tasks = [run_single_agent(spec) for spec in test_specs] results: List[Dict[str, Any]] = await asyncio.gather(*tasks, return_exceptions=True) - - agents: List[Dict[str, Any]] = [] - for spec, res in zip(test_specs, results): + + total_tests = 0 + passed_tests = 0 + failed_tests = 0 + for res in results: if isinstance(res, Exception): - agents.append({ - "persona": spec.get("persona") or "agent", - "status": "error", - "error": repr(res), - }) - else: - agents.append(res) - - summary = { - "run_id": run_id, - "started_at": started_at, - "finished_at": utc_now_iso(), - "artifacts_root": str(artifacts_dir), - "num_agents": len(test_specs), - "agents": agents, - } - - output_path = os.getenv("CUA_OUTPUT_PATH") or str(artifacts_dir / "summary.json") + continue + for t in res: + total_tests += 1 + if t.get("test_success"): + passed_tests += 1 + else: + failed_tests += 1 + run_status = RunStatus.PASSED if failed_tests == 0 else RunStatus.FAILED + overall_result = { + "passed_tests": passed_tests, + "failed_tests": failed_tests, + "total_tests": total_tests, + } + # Create result row and link suites try: - with open(output_path, "w", encoding="utf-8") as f: - json.dump(summary, f, indent=2) - except Exception: - pass - + result_id = await create_result(pr_name, pr_link, overall_result, run_status.value) + if result_id is not None: + suite_ids = {spec.get("suite_id") for spec in test_specs if spec.get("suite_id") is not None} + for sid in suite_ids: + await set_suite_result_id(int(sid), int(result_id)) + except Exception as _e: + print(f"[db] result update error: {_e}") + summary = { + "pr_name": pr_name, + "pr_link": pr_link, + "overall_result": overall_result, + "run_status": run_status.value, + } print(json.dumps(summary)) return summary - - -# Main entry point that can handle both old format (test_specs) and new format (suite_id) -async def run_qai_tests(suite_id: Optional[int] = None, test_specs: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: - """Main entry point for running QAI tests - supports both database and legacy modes""" - if suite_id is not None: - print(f"🗄️ Running tests from database suite {suite_id}") - return await run_agents_from_db(suite_id) - elif test_specs is not None: - print(f"📝 Running tests from provided specs (legacy mode)") - return await run_agents(test_specs, suite_id) - else: - raise ValueError("Either suite_id or test_specs must be provided") - diff --git a/backend/agents/utils.py b/backend/agents/utils.py index aff216f..d954d0f 100644 --- a/backend/agents/utils.py +++ b/backend/agents/utils.py @@ -1,4 +1,3 @@ -import uuid from datetime import datetime, timezone @@ -12,40 +11,66 @@ def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() -def short_id() -> str: - return uuid.uuid4().hex[:8] - - -def build_system_instructions(persona: str | None) -> str: - persona_text = f"You are {persona}." if persona else "You are a meticulous QA tester." - return ( - f"{persona_text} " - "The browser is already open. " - "Act like a real end user for this persona. " - "Follow the user's instructions step-by-step, using the computer tools when needed. " - "If you want to click at the current cursor position, use the 'left_click' action and leave out the coordinates (doing 0,0 will click at the top left corner, not the current cursor position)." - "DO NOT DO action: {'button': 'left', 'type': 'click', 'x': 0, 'y': 0}, leave out the coordinates." - "Be concise, avoid hallucinations, and surface any errors encountered." - ) - - def normalize_tests(spec: dict) -> list[dict]: tests = spec.get("tests") + + # Multiple tests if isinstance(tests, list) and tests: normalized = [] for idx, t in enumerate(tests): name = t.get("name") or f"test-{idx+1}" - messages = t.get("instructions") or t.get("messages") or [] - normalized.append({"name": name, "messages": messages}) + instructions = t.get("instructions") or [] + normalized.append({"name": name, "instructions": instructions}) return normalized - # Fallback: single test using top-level instructions/messages - messages = spec.get("instructions") or spec.get("messages") or [] - suite_name = spec.get("suite") or spec.get("suite_name") or "default" - return [{"name": str(suite_name), "messages": messages}] + + # Single test + instructions = spec.get("instructions") or [] + suite_name = spec.get("suite_name") or "Standard" + return [{"name": str(suite_name), "instructions": instructions}] -def make_remote_recording_dir(persona_slug: str, test_name: str) -> str: +def make_remote_recording_dir(suite_id: str, test_name: str) -> str: test_slug = slugify(str(test_name)) # Use a user-writable base path by default - return f"/tmp/replays/{persona_slug}/{test_slug}" + return f"/tmp/replays/{suite_id}/{test_slug}" + +def process_item(item: dict, suite_id: str, test_agent_steps: list[dict]) -> dict: + item_type = item.get("type") + + if item_type == "message": + try: + content = item.get("content") or [] + for block in content: + if isinstance(block, dict) and block.get("text"): + print(f"[Agent {suite_id}] message: {block['text']}") + test_agent_steps.append(block["text"]) + except Exception: + pass + + elif item_type in ("computer_call", "computer_call_output", "function_call", "function_call_output"): + pruned = dict(item) + if pruned.get("type") == "computer_call_output": + output = pruned.get("output", {}) + if isinstance(output, dict) and "image_url" in output: + output = dict(output) + output["image_url"] = "[omitted]" + pruned["output"] = output + print(f"[Agent {suite_id}] computer_call_output: screenshot captured") + + elif pruned.get("type") == "computer_call": + action = pruned.get("action", {}) or {} + a_type = action.get("type", "unknown") + a_args = {k: v for k, v in action.items() if k != "type"} + print(f"[Agent {suite_id}] computer_call: {a_type}({a_args})") + + elif pruned.get("type") == "function_call": + fname = pruned.get("name", "") + print(f"[Agent {suite_id}] function_call: {fname}") + + elif pruned.get("type") == "function_call_output": + print(f"[Agent {suite_id}] function_call_output: received") + + test_agent_steps.append(pruned) + + return test_agent_steps diff --git a/backend/requirements.txt b/backend/requirements.txt index 6fd3dd4..c757e7a 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1,2 +1,3 @@ cua-agent>=0.4.31 -cua-computer>=0.4.5 \ No newline at end of file +cua-computer>=0.4.5 +supabase>=2.18.1 \ No newline at end of file diff --git a/backend/agent_test.py b/backend/tests/agent_test.py similarity index 98% rename from backend/agent_test.py rename to backend/tests/agent_test.py index 75f99af..a948852 100644 --- a/backend/agent_test.py +++ b/backend/tests/agent_test.py @@ -2,7 +2,7 @@ import os from dotenv import load_dotenv, find_dotenv -from agents.runner import run_agents +from ..agents.runner import run_agents def main() -> None: diff --git a/backend/record_test.py b/backend/tests/record_test.py similarity index 71% rename from backend/record_test.py rename to backend/tests/record_test.py index 7402141..3b7e0d5 100644 --- a/backend/record_test.py +++ b/backend/tests/record_test.py @@ -1,7 +1,7 @@ from computer import Computer import os from dotenv import load_dotenv, find_dotenv -import record_lib +from ..agents.record import start_recording, stop_recording import asyncio load_dotenv(find_dotenv()) @@ -19,10 +19,10 @@ async def main(): api_key=api_key ) as computer: await computer.venv_install("demo_venv", []) - await computer.venv_exec("demo_venv", record_lib.start_recording, output_dir="/tmp/replays", fps=5) + await computer.venv_exec("demo_venv", start_recording, output_dir="/tmp/replays", fps=5) await asyncio.sleep(5) - await computer.venv_exec("demo_venv", record_lib.stop_recording) + await computer.venv_exec("demo_venv", stop_recording) asyncio.run(main()) \ No newline at end of file