diff --git a/prompts/critical_systems_thinking.jinja b/prompts/critical_systems_thinking.jinja new file mode 100644 index 0000000..de90fcb --- /dev/null +++ b/prompts/critical_systems_thinking.jinja @@ -0,0 +1,22 @@ +You are an AI assistant specialized in critical systems thinking. Your primary function is to analyze complex problems, propose solutions, and execute tasks autonomously when required. + +You have the following capabilities: +1. Perform structured analysis using the MECE principle. +2. Search for information using web APIs. +3. Analyze and clarify unclear requests. +4. Review web page content. +5. Consider and propose solutions to problems. +6. Conduct stakeholder analysis. +7. Generate future scenarios. +8. Create system maps. +9. Perform risk assessments. +10. Conduct ethical analyses. +11. Run tasks autonomously in the background. + +When using the autonomous_loop tool: +- You can start a background task that simulates work. +- While the background task is running, you can still respond to new messages from the user. +- When the background task completes, you should provide an update on its completion. +- You should be able to handle multiple tasks in the background and keep track of their progress. + +Always strive to provide clear, concise, and well-structured responses. When analyzing problems or proposing solutions, consider multiple perspectives and potential consequences. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 71f1d25..246dac9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,9 @@ requires-python = ">=3.10" dependencies = [ "ai-exchange>=0.8.4", "goose-ai>=0.9.8", + "requests>=2.32.3", + "selenium>=4.25.0", + "webdriver-manager>=4.0.2", ] author = [{ name = "Block", email = "ai-oss-tools@block.xyz" }] packages = [{ include = "goose_plugins", from = "src" }] @@ -19,6 +22,7 @@ goose-plugins = "goose_plugins:module_name" [project.entry-points."goose.toolkit"] artify = "goose_plugins.toolkits.artify:VincentVanCode" +critical_systems_thinking = "goose_plugins.toolkits.critical_systems_thinking:CriticalSystemsThinking" [build-system] diff --git a/src/goose_plugins/cli.py b/src/goose_plugins/cli.py index e69de29..793956a 100644 --- a/src/goose_plugins/cli.py +++ b/src/goose_plugins/cli.py @@ -0,0 +1,15 @@ +import click +from goose_plugins.utils.log_streamer import start_log_stream + +@click.group() +def cli(): + pass + +@cli.command() +def stream_log(): + """Stream the current Goose session log in real-time.""" + click.echo("Starting log stream for the current Goose session...") + start_log_stream() + +if __name__ == '__main__': + cli() \ No newline at end of file diff --git a/src/goose_plugins/toolkits/critical_systems_thinking.py b/src/goose_plugins/toolkits/critical_systems_thinking.py new file mode 100644 index 0000000..7586d79 --- /dev/null +++ b/src/goose_plugins/toolkits/critical_systems_thinking.py @@ -0,0 +1,216 @@ +from exchange import Exchange, Message, Text +from exchange.content import Content +from exchange.providers import AnthropicProvider +from goose.toolkit.base import Toolkit +from goose.utils.ask import ask_an_ai +import queue +import time +import threading +import os +import signal +import atexit +import sys +from pathlib import Path +import json + +from .utils.task_utils import parse_duration, format_task_status, write_intermediate_result, trigger_apple_dialog +from .utils.analysis_utils import create_analysis_prompt +from .tools.search_tools import SearchTools +from .tools.analysis_tools import AnalysisTools +from .tools.task_management_tools import TaskManagementTools + +class CriticalSystemsThinking(Toolkit, SearchTools, AnalysisTools, TaskManagementTools): + """Critical systems thinking toolkit for understanding and solving complex problems.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.task_queue = queue.Queue() + self.autonomous_mode = False + self.ongoing_tasks = {} + self.completed_tasks = [] + self.latest_results = {} + self.pause_flags = {} + self.loop_threads = {} + self.final_results = {} # New attribute to store final results of completed tasks + self.failed_tasks = [] # New attribute to store failed tasks + self.results_folder = Path.home() / ".goose" / "results" + os.makedirs(self.results_folder, exist_ok=True) + self.log_file = Path.home() / ".goose" / "session_log.json" + self.log_file.parent.mkdir(parents=True, exist_ok=True) + self.tool_log_file = Path.home() / ".goose" / "tool_calls.log" + self.tool_log_file.parent.mkdir(parents=True, exist_ok=True) + + # Set up signal handler for graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + atexit.register(self._cleanup) + + def log_tool_call(self, tool_name: str, parameters: dict): + """Log a tool call to the tool_calls.log file.""" + timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + log_entry = f"{timestamp} - Tool: {tool_name}, Parameters: {json.dumps(parameters)}\n" + with open(self.tool_log_file, "a") as log: + log.write(log_entry) + + def _signal_handler(self, signum, frame): + """Handle termination signals.""" + self.notify("Received termination signal. Cleaning up...") + self._cleanup() + sys.exit(0) + + def _cleanup(self): + """Clean up all running tasks and terminate threads.""" + self.notify("Cleaning up and aborting all tasks...") + for task_id in list(self.ongoing_tasks.keys()): + self._abort_task(task_id) + + # Terminate all threads + for thread in self.loop_threads.values(): + if thread.is_alive(): + thread.join(timeout=1) # Give threads 1 second to finish + + # Force terminate any remaining threads + for thread in threading.enumerate(): + if thread != threading.main_thread(): + try: + thread._stop() + except: + pass + + self.notify("All tasks aborted and threads terminated.") + + def _create_exchange(self, include_history=False, system=None) -> Exchange: + """Create a new Exchange instance with optional message history.""" + provider = AnthropicProvider.from_env() + messages = [] + if include_history: + messages = [ + Message(role=msg.role, content=[self.message_content(content) for content in msg.content]) + for msg in self.exchange_view.processor.messages + ] + return Exchange( + provider=provider, + model="claude-3-5-sonnet-20240620", + messages=messages, + system=system + ) + + def _ask_ai(self, prompt: str, include_history=False, system=None) -> str: + """Helper method to handle AI requests.""" + exchange = self._create_exchange(include_history, system) + response = ask_an_ai(input=prompt, exchange=exchange, no_history=not include_history) + return response.content[0].text + + def message_content(self, content: Content) -> Text: + return Text(str(content)) if not isinstance(content, Text) else content + + def notify(self, message: str): + """Standardized notification method for concise status updates.""" + self.notifier.status(f"[CST] {message[:50]}...") + + def add_task(self, task_id: str, task_description: str, duration: int): + """Add a task to the ongoing tasks dictionary.""" + self.log_tool_call("add_task", { + "task_id": task_id, + "task_description": task_description[:50] + "...", + "duration": duration + }) + self.ongoing_tasks[task_id] = { + "description": task_description, + "start_time": time.time(), + "duration": duration + } + + def complete_task(self, task_id: str, result: str, success: bool = True): + """Move a task from ongoing to completed or failed and record result.""" + if task_id in self.ongoing_tasks: + task = self.ongoing_tasks.pop(task_id) + task.update({"result": result, "end_time": time.time()}) + if success: + self.completed_tasks.append(task) + self.final_results[task_id] = result # Store the final result + else: + self.failed_tasks.append(task) + if task_id in self.latest_results: + del self.latest_results[task_id] # Remove from latest_results as it's now completed or failed + + def _abort_task(self, task_id: str): + """Abort a running task.""" + if task_id in self.ongoing_tasks: + self.pause_flags[task_id] = True + if task_id in self.loop_threads: + self.loop_threads[task_id].join(timeout=1) # Wait for the thread to finish + self.complete_task(task_id, "Task aborted", success=False) + + def _call_llm(self, prompt: str) -> str: + """ + Call the language model to refine the answer. + + Args: + prompt (str): The prompt to send to the language model. + + Returns: + response (str): The refined answer from the language model. + """ + # self.log_tool_call("_call_llm", {"prompt": prompt[:50] + "..."}) # Log first 50 chars of prompt + return self._ask_ai(prompt, include_history=True, system=self.system_prompt()) + + def _check_stop_condition(self, task_description: str, current_answer: str, + iterations: int, elapsed_time: int, total_duration: int) -> dict: + """ + Check if the autonomous loop should stop based on the current state of the task. + + Args: + task_description (str): The description of the task being performed. + current_answer (str): The current refined answer. + iterations (int): The number of iterations completed. + elapsed_time (int): The time elapsed since the task started (in seconds). + total_duration (int): The total allowed duration for the task (in seconds). + + Returns: + dict: A dictionary containing the stop decision and reasoning. + """ + self.log_tool_call("_check_stop_condition", { + "task_description": task_description[:50] + "...", + "current_answer": current_answer[:50] + "...", + "iterations": iterations, + "elapsed_time": elapsed_time, + "total_duration": total_duration + }) + + prompt = f""" + Task: {task_description} + Current answer: {current_answer} + Iterations completed: {iterations} + Elapsed time: {elapsed_time} seconds + Total allowed duration: {total_duration} seconds + + Based on the current state of the task, determine if the autonomous loop should stop. + Consider the quality and completeness of the current answer, the number of iterations, + and the time constraints. + + Provide your response in the following JSON format: + {{ + "should_stop": boolean, + "reasoning": "A brief explanation of why the task should or should not stop." + }} + """ + + exchange = self._create_exchange(include_history=True, system=self.system_prompt()) + response = ask_an_ai(input=prompt, exchange=exchange) + + try: + result = json.loads(response.content[0].text) + if not isinstance(result, dict) or "should_stop" not in result or "reasoning" not in result: + raise ValueError("Invalid JSON structure") + return result + except json.JSONDecodeError: + self.notify("Error: Invalid JSON response from LLM for stop condition check.") + return {"should_stop": False, "reasoning": "Error in LLM response format. Continuing task."} + except ValueError as e: + self.notify(f"Error: {str(e)}") + return {"should_stop": False, "reasoning": "Error in LLM response structure. Continuing task."} + + def system_prompt(self) -> str: + """Retrieve instructions on how to use this reasoning tool.""" + return Message.load("prompts/critical_systems_thinking.jinja").text \ No newline at end of file diff --git a/src/goose_plugins/toolkits/prompts/critical_systems_thinking.jinja b/src/goose_plugins/toolkits/prompts/critical_systems_thinking.jinja new file mode 100644 index 0000000..fc5c86c --- /dev/null +++ b/src/goose_plugins/toolkits/prompts/critical_systems_thinking.jinja @@ -0,0 +1,170 @@ +You are an AI Agent. Please ensure you meet the following requirements: + +Feature: Smart Problem-Solving Assistant + As someone who needs help solving complex problems + I want an AI assistant that thinks carefully and systematically + So that I get well-thought-out solutions that consider all important aspects + + Background: Assistant Capabilities + Given I am an AI assistant that specializes in solving problems + And I have special tools to help analyze situations thoroughly + And I always follow a careful step-by-step approach + + # This section describes how I handle different types of questions + Scenario: Answering Simple Questions + When someone asks me a straightforward question + Then I give a clear, direct answer + And I keep it brief and to the point + And I don't overcomplicate things with unnecessary analysis + + Scenario: Handling Technical Problems + When someone asks me about a technical issue + Then I break down the problem step by step + And I explain my reasoning clearly + And I use examples to illustrate my points + + Scenario: Solving Complex Problems + When someone presents a complicated situation + Then I analyze it from multiple angles + And I consider how different parts affect each other + And I look for hidden issues that might be important + + # This section explains how I use my analysis tools + Scenario Outline: Using Helper Tools + When I need help with "" + Then I use my "" to assist me + And I tell you I'm using this tool + And I share what the tool tells me + + Examples: Common Tasks and Tools + | task | tool | + | Understanding unclear requests | analyze_request | + | Finding information | search | + | Identifying who's involved | stakeholder_analysis | + | Mapping how things connect | system_mapping | + | Thinking about future impacts | generate_future_scenarios | + | Coming up with solutions | consider_solutions | + | Checking for problems | risk_assessment | + | Looking up web information | review_web_page | + | Analyzing ethical implications | ethical_analysis | + | Performing structured analysis | structured_analysis | + | Running background tasks | autonomous_loop | + | Checking background job status | get_background_job_status | + | Getting latest results | get_latest_results | + | Getting task information | get_task_info | + | Interpreting task queries | interpret_task_query | + | Listing all tasks | list_task_ids | + | Pausing background tasks | pause_autonomous_loop | + | Resuming background tasks | resume_autonomous_loop | + + # This section shows my complete problem-solving process + Scenario: Complete Problem-Solving Process + Given someone has asked me to solve a complex problem + + # Step 1: Understanding the Problem + When I first look at the problem + Then I make sure I understand what's being asked + And I gather all necessary background information + And I use the analyze_request tool if the request is unclear + + # Step 2: Identifying People Affected + When I think about who's involved + Then I list all the people or groups affected using the stakeholder_analysis tool + And I consider what matters to each of them + + # Step 3: Understanding Connections + When I look at how things are connected + Then I draw a map of all the important parts using the system_mapping tool + And I show how they affect each other + + # Step 4: Thinking About the Future + When I consider what might happen + Then I use the generate_future_scenarios tool to imagine different possible outcomes + And I think about future challenges and opportunities + + # Step 5: Finding Solutions + When I work on solutions + Then I use the consider_solutions tool to come up with several possible approaches + And I think through each one carefully + + # Step 6: Checking for Problems + When I review my solutions + Then I use the risk_assessment tool to look for potential problems + And I think about how to prevent or mitigate them + + # Step 7: Ethical Considerations + When I evaluate the ethical implications of my solutions + Then I use the ethical_analysis tool to assess potential ethical issues + And I ensure my recommendations align with ethical principles + + # Step 8: Structured Analysis + When I need to organize my thoughts and ensure comprehensive coverage + Then I use the structured_analysis tool to apply the MECE principle + And I ensure all aspects of the problem are considered + + # Step 9: Final Recommendation + When I make my final recommendation + Then I explain why I chose this solution + And I describe how to implement it + And I mention any important warnings or advice + And I provide a summary of the ethical considerations and risk assessment + + # This section explains how I communicate + Rule: How I Explain Things + Given I'm explaining something to someone + Then I match their level of technical knowledge + And I use clear, simple language + And I give practical examples + And I break complex ideas into smaller parts + And I check if they need more clarification + + Rule: How I Handle Technical Topics + Given I'm discussing technical matters + Then I reference best practices in that field + And I point out important safety concerns + And I consider how easy it is to maintain + And I suggest ways to monitor if it's working + And I mention any rules or regulations to follow + + Rule: How I Make Practical Recommendations + Given I'm suggesting practical solutions + Then I consider what's realistically possible + And I think about available resources + And I suggest simple solutions first + And I always have backup plans + And I consider how people will actually use it + + # This section explains how I handle background tasks + Scenario: Managing Background Tasks + Given I need to perform a time-consuming analysis + When I start an autonomous_loop task + Then I provide regular updates on its progress + And I use get_background_job_status to check on all running tasks + And I use get_latest_results to retrieve the most recent findings + And I can pause_autonomous_loop or resume_autonomous_loop as needed + And I use get_task_info to provide detailed information about specific tasks + And I can interpret_task_query to answer questions about ongoing tasks + And I use list_task_ids to keep track of all tasks, ongoing and completed + + Rule: How I Handle Web Content + Given I need to gather information from the web + When I use the review_web_page tool + Then I summarize the content concisely + And I extract the most relevant information + And I consider the credibility of the source + And I integrate this information into my analysis + + Rule: How I Conduct Searches + Given I need additional information + When I use the search tool + Then I craft specific and targeted search queries + And I analyze the search results critically + And I synthesize information from multiple sources + And I cite the sources in my responses + + Rule: How I Adapt to New Information + Given I receive new information during the problem-solving process + When I incorporate this new data + Then I reassess my previous conclusions + And I update my recommendations if necessary + And I explain the reasons for any changes in my analysis \ No newline at end of file diff --git a/src/goose_plugins/toolkits/tools/analysis_tools.py b/src/goose_plugins/toolkits/tools/analysis_tools.py new file mode 100644 index 0000000..26c8304 --- /dev/null +++ b/src/goose_plugins/toolkits/tools/analysis_tools.py @@ -0,0 +1,139 @@ +from goose.toolkit.base import tool +from ..utils.analysis_utils import create_analysis_prompt + +class AnalysisTools: + @tool + def structured_analysis(self, problem: str) -> str: + """ + Perform a structured analysis of the given problem using the MECE principle. + + Args: + problem (str): A description of the problem to analyze. + + Returns: + response (str): A JSON string containing the structured analysis. + """ + self.notify("Performing structured analysis") + self.log_tool_call("structured_analysis", {"problem": problem}) + return self._ask_ai(create_analysis_prompt("structured", p=problem)) + + @tool + def analyze_request(self, statement: str) -> str: + """ + When a request is unclear, high-level or ambiguous use this tool to + analyze the response and provide a well thought out response. + + Args: + statement (str): description of problem or errors seen. + + Returns: + response (str): A well thought out response to the statement or question. + """ + self.notifier.status("analyzing request...") + self.log_tool_call("analyze_request", {"statement": statement}) + return self._ask_ai( + create_analysis_prompt("request", s=statement), + include_history=True, + system=self.system_prompt() + ) + + @tool + def consider_solutions(self, statement: str) -> str: + """ + Provide a well thought out response to the statement summarize the + problem and provide a solution or a set of solutions. + + Args: + statement (str): description of problem or errors seen. + + Returns: + response (str): A well thought out response to the statement or question. + """ + self.notifier.status("considering solutions...") + self.log_tool_call("consider_solutions", {"statement": statement}) + return self._ask_ai( + create_analysis_prompt("solutions", s=statement), + include_history=True + ) + + @tool + def stakeholder_analysis(self, problem_statement: str) -> str: + """ + Identify and analyze key stakeholders related to the given problem. + + Args: + problem_statement (str): A description of the problem or situation. + + Returns: + response (str): A JSON string containing a list of stakeholders, their interests, and potential impacts. + """ + self.notifier.status("Analyzing stakeholders...") + return self._ask_ai( + create_analysis_prompt("stakeholder", p=problem_statement), + include_history=True + ) + + @tool + def generate_future_scenarios(self, problem_statement: str, time_horizon: str) -> str: + """ + Generate potential future scenarios based on the given problem statement and time horizon. + + Args: + problem_statement (str): A description of the current problem or situation. + time_horizon (str): The future time frame to consider (e.g., "5 years", "10 years", "50 years"). + + Returns: + response (str): A JSON string containing a list of potential future scenarios. + """ + self.notifier.status("Generating future scenarios...") + return self._ask_ai( + create_analysis_prompt("future_scenarios", p=problem_statement, t=time_horizon) + ) + + @tool + def system_mapping(self, problem_statement: str) -> str: + """ + Create a high-level system map based on the given problem statement. + + Args: + problem_statement (str): A description of the problem or situation. + + Returns: + response (str): A JSON string representing a high-level system map. + """ + self.notifier.status("Creating system map...") + return self._ask_ai(create_analysis_prompt("system_mapping", p=problem_statement)) + + @tool + def risk_assessment(self, problem_statement: str, proposed_solution: str) -> str: + """ + Perform a risk assessment for the given problem and proposed solution. + + Args: + problem_statement (str): A description of the problem or situation. + proposed_solution (str): A description of the proposed solution. + + Returns: + response (str): A JSON string containing a list of potential risks and their assessments. + """ + self.notifier.status("Performing risk assessment...") + return self._ask_ai( + create_analysis_prompt("risk", p=problem_statement, s=proposed_solution) + ) + + @tool + def ethical_analysis(self, problem_statement: str, proposed_solution: str) -> str: + """ + Perform an ethical analysis of the given problem and proposed solution. + + Args: + problem_statement (str): A description of the problem or situation. + proposed_solution (str): A description of the proposed solution. + + Returns: + response (str): A JSON string containing an ethical analysis of the problem and solution. + """ + self.notifier.status("Performing ethical analysis...") + return self._ask_ai( + create_analysis_prompt("ethical", p=problem_statement, s=proposed_solution) + ) \ No newline at end of file diff --git a/src/goose_plugins/toolkits/tools/search_tools.py b/src/goose_plugins/toolkits/tools/search_tools.py new file mode 100644 index 0000000..29b5a3a --- /dev/null +++ b/src/goose_plugins/toolkits/tools/search_tools.py @@ -0,0 +1,37 @@ +from goose.toolkit.base import tool +from goose_plugins.utils.serper_search import serper_search +from goose_plugins.utils.selenium_web_browser import get_web_page_content + +class SearchTools: + @tool + def search(self, query: str) -> str: + """ + Search the web for information using the Serper API. + + Args: + query (str): query to search for. + + Returns: + response (str): A JSON response containing search results. + """ + self.notifier.status("searching...") + return serper_search(query) + + @tool + def review_web_page(self, url: str) -> str: + """ + Review the content of a web page by providing a summary of the content. + + Args: + url (str): URL of the web page to review. + + Returns: + response (str): A summary of the content of the web page. + """ + self.notifier.status(f"fetching content from {url}") + try: + web_content = get_web_page_content(url) + self.notifier.status(f"reviewing content: {web_content[:50]}...") + return self._ask_ai(f"summarize the following content: {web_content}") + except Exception as e: + return f"Error: {str(e)}" \ No newline at end of file diff --git a/src/goose_plugins/toolkits/tools/task_management_tools.py b/src/goose_plugins/toolkits/tools/task_management_tools.py new file mode 100644 index 0000000..b5d2d0f --- /dev/null +++ b/src/goose_plugins/toolkits/tools/task_management_tools.py @@ -0,0 +1,324 @@ +from goose.toolkit.base import tool +import json +import time +import threading +from ..utils.task_utils import format_task_status, parse_duration, write_intermediate_result, trigger_apple_dialog + +class TaskManagementTools: + @tool + def autonomous_loop(self, task_description: str, duration: str = "1m") -> str: + """ + Run a task autonomously in the background, continuously working on a problem and building a final answer. + + Args: + task_description (str): A description of the task to be performed. + duration (str): The total duration for the task. Format: "". + Units: 's' for seconds, 'm' for minutes, 'h' for hours. + Examples: "30s", "5m", "1h". Default: "1m". + + Returns: + response (str): A message indicating the continuous background task has been started. + """ + duration_seconds = parse_duration(duration) + interval_seconds = 20 # Enforcing 20-second interval + task_id = f"task_{int(time.time())}" + self.pause_flags[task_id] = False + self.latest_results[task_id] = {"current_answer": "", "iterations": 0, "status": "running"} + self.notifier.status(f"Starting autonomous loop for task {task_id}: {task_description}") + + def continuous_background_task(): + start_time = time.time() + current_answer = "" + iterations = 0 + + # Initialize the task in latest_results + self.latest_results[task_id] = { + "current_answer": current_answer, + "iterations": iterations, + "status": "running", + "elapsed_time": 0 + } + + try: + while time.time() - start_time < duration_seconds: + if self.pause_flags[task_id]: + time.sleep(1) + continue + + iterations += 1 + elapsed_time = int(time.time() - start_time) + + prompt = f""" + Task: {task_description} + Current answer: {current_answer} + Iteration: {iterations} + Elapsed time: {elapsed_time} seconds + Total duration: {duration_seconds} seconds + + Please refine the current answer using your critical thinking skills and the task description. + """ + + try: + # Call the LLM to refine the answer + refined_answer = self._call_llm(prompt) + + # Check if we should stop + stop_check = self._check_stop_condition(task_description, refined_answer, iterations, elapsed_time, duration_seconds) + + if stop_check["should_stop"]: + break + + current_answer = refined_answer + self.latest_results[task_id].update({ + "current_answer": current_answer, + "iterations": iterations, + "status": "running", + "elapsed_time": elapsed_time + }) + + # Write intermediate result + write_intermediate_result(self.results_folder, task_id, iterations, current_answer) + + except Exception as e: + error_message = f"Error in iteration {iterations}: {str(e)}" + # self.logger.error(f"Task {task_id}: {error_message}") + write_intermediate_result(self.results_folder, task_id, iterations, f"ERROR: {error_message}") + # Continue to the next iteration instead of breaking the loop + continue + + time.sleep(interval_seconds) + + self.complete_task(task_id, f"Continuous task '{task_description}' completed. Final answer: {current_answer}") + if task_id in self.latest_results: + self.latest_results[task_id]["status"] = "completed" + trigger_apple_dialog("Task Completed", f"Continuous task completed after {iterations} iterations.") + + except Exception as e: + error_message = f"Critical error in task {task_id}: {str(e)}" + self.notify(error_message) + write_intermediate_result(self.results_folder, task_id, iterations, f"CRITICAL ERROR: {error_message}") + self.complete_task(task_id, f"Continuous task '{task_description}' failed. Error: {error_message}") + if task_id in self.latest_results: + self.latest_results[task_id]["status"] = "failed" + trigger_apple_dialog("Task Failed", f"Continuous task failed after {iterations} iterations.") + + self.autonomous_mode = True + self.notify(f"Starting continuous background task: {task_description}") + self.add_task(task_id, task_description, duration_seconds) + + thread = threading.Thread(target=continuous_background_task) + self.loop_threads[task_id] = thread + thread.start() + return f"Continuous background task '{task_description}' (ID: {task_id}) has been started with a duration of {duration}." + + @tool + def get_background_job_status(self) -> str: + """Get the status of all background jobs (ongoing, completed, and failed).""" + current_time = time.time() + status = ["Background Job Status:\n"] + + if self.ongoing_tasks: + status.append("\nOngoing Tasks:") + for task_id, task in self.ongoing_tasks.items(): + status.append(format_task_status(task_id, task, current_time)) + + if self.completed_tasks: + status.append("\nCompleted Tasks:") + for task in self.completed_tasks: + status.append(format_task_status(task["description"], task, current_time)) + + if self.failed_tasks: + status.append("\nFailed Tasks:") + for task in self.failed_tasks: + status.append(format_task_status(task["description"], task, current_time)) + + return "\n".join(status) + + @tool + def pause_autonomous_loop(self, task_id: str) -> str: + """ + Pause the autonomous loop for a given task. + + Args: + task_id (str): The ID of the task to pause. + + Returns: + response (str): A message indicating whether the task was successfully paused. + """ + if task_id in self.pause_flags: + self.pause_flags[task_id] = True + return f"Task {task_id} has been paused." + else: + return f"Task {task_id} not found." + + @tool + def resume_autonomous_loop(self, task_id: str) -> str: + """ + Resume the autonomous loop for a given task. + + Args: + task_id (str): The ID of the task to resume. + + Returns: + response (str): A message indicating whether the task was successfully resumed. + """ + if task_id in self.pause_flags: + self.pause_flags[task_id] = False + return f"Task {task_id} has been resumed." + else: + return f"Task {task_id} not found." + + @tool + def get_latest_results(self, task_id: str) -> str: + """ + Get the latest results for a given task. + + Args: + task_id (str): The ID of the task to get results for. + + Returns: + response (str): A string containing the latest results and iteration count, + or the final result if the task is completed. + """ + if task_id in self.final_results: + return f"Final result for completed task {task_id}:\n{self.final_results[task_id]}" + elif task_id in self.latest_results: + result = self.latest_results[task_id] + return f"Latest results for ongoing task {task_id}:\nIterations: {result['iterations']}\nCurrent answer: {result['current_answer']}" + else: + return f"No results found for task {task_id}." + + @tool + def list_task_ids(self) -> str: + """ + List all task IDs, including ongoing and completed tasks. + + Returns: + response (str): A string containing all task IDs. + """ + all_tasks = list(self.ongoing_tasks.keys()) + [task["description"] for task in self.completed_tasks] + return f"Task IDs:\n" + "\n".join(all_tasks) + + @tool + def get_task_info(self, task_id: str) -> str: + """ + Get detailed information about a specific task. + + Args: + task_id (str): The ID of the task to get information for. + + Returns: + response (str): A string containing detailed information about the task. + """ + if task_id in self.ongoing_tasks: + task = self.ongoing_tasks[task_id] + latest_result = self.latest_results.get(task_id, {}) + return f""" + Task ID: {task_id} + Description: {task['description']} + Status: Ongoing + Start Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(task['start_time']))} + Elapsed Time: {int(time.time() - task['start_time'])} seconds + Iterations: {latest_result.get('iterations', 'N/A')} + Current Answer: {latest_result.get('current_answer', 'N/A')} + """ + elif task_id in [task["description"] for task in self.completed_tasks]: + task = next(task for task in self.completed_tasks if task["description"] == task_id) + return f""" + Task ID: {task_id} + Description: {task['description']} + Status: Completed + Start Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(task['start_time']))} + End Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(task['end_time']))} + Duration: {int(task['end_time'] - task['start_time'])} seconds + Result: {task['result']} + """ + else: + return f"No task found with ID: {task_id}" + + @tool + def interpret_task_query(self, query: str) -> str: + """ + Interpret a natural language query about tasks and provide relevant information. + + Args: + query (str): A natural language query about tasks. + + Returns: + response (str): The interpreted response to the query. + """ + prompt = f""" + Given the following natural language query about tasks, interpret the request and provide the most relevant information: + + Query: {query} + + Consider the following context: + - There may be ongoing, completed, and failed tasks. + - Task IDs are usually in the format "task_timestamp". + - The query might refer to the "last task" or use other relative terms. + - The user might use shorthand or informal language. + - The user might be specifically asking about failed tasks. + + Based on this query, determine: + 1. What specific task(s) is the user asking about? + 2. What information does the user want to know? + 3. Is the user specifically asking about failed tasks? + 4. How should I respond to this query using the available task information? + + Provide your response in the following JSON format: + {{ + "interpreted_task_ids": ["list", "of", "relevant", "task", "ids"], + "requested_info": "brief description of the information requested", + "response_method": "name of the method to use for responding (e.g., 'get_task_info', 'get_latest_results', 'list_task_ids', 'get_background_job_status')", + "additional_context": "any additional context or instructions for formulating the response", + "include_failed_tasks": boolean + }} + """ + + interpretation = self._ask_ai(prompt, include_history=True, system=self.system_prompt()) + + try: + result = json.loads(interpretation) + response = self._generate_task_response(result) + return response + except json.JSONDecodeError: + return f"Error: Unable to interpret the query. Please try rephrasing your question about the tasks." + + def _generate_task_response(self, interpretation: dict) -> str: + """ + Generate a response based on the interpreted task query. + + Args: + interpretation (dict): The interpreted query information. + + Returns: + response (str): The generated response to the query. + """ + response = f"Interpreted request: {interpretation['requested_info']}\n\n" + + if interpretation['response_method'] == 'list_task_ids': + response += self.list_task_ids() + elif interpretation['response_method'] == 'get_background_job_status': + response += self.get_background_job_status() + elif interpretation['response_method'] in ['get_task_info', 'get_latest_results']: + for task_id in interpretation['interpreted_task_ids']: + if interpretation['response_method'] == 'get_task_info': + response += self.get_task_info(task_id) + "\n" + else: + response += self.get_latest_results(task_id) + "\n" + else: + response += f"Unable to process the request using method: {interpretation['response_method']}" + + if interpretation.get('include_failed_tasks', False): + failed_tasks = [task for task in self.failed_tasks] + if failed_tasks: + response += "\nFailed Tasks:\n" + for task in failed_tasks: + response += f"- Task ID: {task['description']}\n Result: {task['result']}\n" + else: + response += "\nNo failed tasks found." + + if interpretation['additional_context']: + response += f"\nAdditional context: {interpretation['additional_context']}" + + return response \ No newline at end of file diff --git a/src/goose_plugins/toolkits/utils/analysis_utils.py b/src/goose_plugins/toolkits/utils/analysis_utils.py new file mode 100644 index 0000000..04ba58e --- /dev/null +++ b/src/goose_plugins/toolkits/utils/analysis_utils.py @@ -0,0 +1,77 @@ +def create_analysis_prompt(analysis_type: str, **kwargs) -> str: + """Create a prompt for various types of analysis.""" + prompts = { + "structured": lambda p: f""" + Perform a structured analysis of the following problem using the MECE principle: + {p} + Return the results as a JSON string with the following structure: + {{ + "problem": "Brief restatement of the problem", + "categories": [ + {{ + "name": "Category Name", + "elements": ["Element 1", "Element 2", ...], + "analysis": "Brief analysis of this category" + }}, + ... + ], + "conclusion": "Overall conclusion based on the structured analysis" + }} + """, + "request": lambda s: f""" + Analyze the user statement: {s} + If you need to immediately clarify something and it's something + short and simple, respond with your question(s). + If you need multiple questions, you can ask multiple questions. + Please bullet point your questions. + Limit your response to 5 questions. + """, + "solutions": lambda s: f""" + Analyze the user statement: {s} + Consider the existing message history and provide a well thought out response. + Provide one or more potential solutions to the problem. + Limit your response to 5 solutions. + """, + "stakeholder": lambda p: f""" + Analyze the following problem statement and identify key stakeholders: + {p} + For each stakeholder, determine their interests and potential impacts. + """, + "future_scenarios": lambda p, t: f""" + Based on the following problem statement and time horizon, generate potential future scenarios: + Problem: {p} + Time Horizon: {t} + + Consider various factors such as technological advancements, societal changes, + environmental impacts, and potential policy shifts. + + Return the results as a JSON string with scenarios including name, description, + key factors, and potential outcomes. + Generate at least 3 distinct scenarios. + """, + "system_mapping": lambda p: f""" + Based on the following problem statement, create a high-level system map: + {p} + + Identify key components, their relationships, and potential feedback loops. + Return results as JSON with components and feedback loops. + """, + "risk": lambda p, s: f""" + Perform a risk assessment for the following problem and proposed solution: + Problem: {p} + Proposed Solution: {s} + + Identify potential risks, their likelihood, impact, and possible mitigation strategies. + Return results as JSON with detailed risk assessments. + """, + "ethical": lambda p, s: f""" + Perform an ethical analysis for the following problem and proposed solution: + Problem: {p} + Proposed Solution: {s} + + Consider various ethical frameworks and principles, potential ethical dilemmas, + and the impact on different stakeholders. + Return results as JSON with ethical considerations and overall assessment. + """ + } + return prompts[analysis_type](**kwargs) \ No newline at end of file diff --git a/src/goose_plugins/toolkits/utils/logging_utils.py b/src/goose_plugins/toolkits/utils/logging_utils.py new file mode 100644 index 0000000..4ee3f30 --- /dev/null +++ b/src/goose_plugins/toolkits/utils/logging_utils.py @@ -0,0 +1,31 @@ +import logging +import sys +from pathlib import Path + +def setup_logging(): + """Set up logging for the CriticalSystemsThinking toolkit.""" + log_file = Path.home() / ".goose" / "critical_systems_thinking.log" + log_file.parent.mkdir(parents=True, exist_ok=True) + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(sys.stdout) + ] + ) + return logging.getLogger(__name__) + +def log_action(log_file, action: str, details: str): + """Log an action to the session log file.""" + import time + import json + + log_entry = { + "timestamp": time.time(), + "action": action, + "details": details + } + with open(log_file, 'a') as f: + json.dump(log_entry, f) + f.write('\n') \ No newline at end of file diff --git a/src/goose_plugins/toolkits/utils/task_utils.py b/src/goose_plugins/toolkits/utils/task_utils.py new file mode 100644 index 0000000..31bb513 --- /dev/null +++ b/src/goose_plugins/toolkits/utils/task_utils.py @@ -0,0 +1,42 @@ +import time +import os + +def parse_duration(duration: str) -> int: + """Convert duration string to seconds.""" + value = int(duration[:-1]) + unit = duration[-1].lower() + multipliers = {'s': 1, 'm': 60, 'h': 3600} + if unit not in multipliers: + raise ValueError("Invalid duration format. Use 's' for seconds, 'm' for minutes, or 'h' for hours.") + return value * multipliers[unit] + +def format_task_status(task_id: str, task: dict, current_time: float) -> str: + """Format a single task's status.""" + elapsed_time = current_time - task["start_time"] + remaining_time = max(0, task.get("duration", 0) - elapsed_time) + status = f"- Task ID: {task_id}\n" + status += f" Description: {task['description']}\n" + if "end_time" in task: + status += f" Duration: {task['end_time'] - task['start_time']:.2f} seconds\n" + status += f" Result: {task['result']}\n" + else: + status += f" Elapsed Time: {elapsed_time:.2f} seconds\n" + status += f" Remaining Time: {remaining_time:.2f} seconds\n" + return status + +def write_intermediate_result(results_folder: str, task_id: str, iteration: int, result: str): + """Write intermediate results to a file for debugging purposes.""" + filename = f"{task_id}_iteration_{iteration}.txt" + filepath = os.path.join(results_folder, filename) + with open(filepath, 'w') as f: + f.write(f"Task ID: {task_id}\n") + f.write(f"Iteration: {iteration}\n") + f.write(f"Result:\n{result}\n") + return filepath + +def trigger_apple_dialog(title: str, message: str): + """Trigger an Apple dialog box without logging the result.""" + import subprocess + dialog_command = f'display dialog "{message}" buttons {{"OK"}} default button "OK" with title "{title}"' + subprocess.run(["osascript", "-e", dialog_command], capture_output=True, text=True) + # The result is captured but not used, effectively discarding it \ No newline at end of file diff --git a/src/goose_plugins/utils/log_streamer.py b/src/goose_plugins/utils/log_streamer.py new file mode 100644 index 0000000..f54b825 --- /dev/null +++ b/src/goose_plugins/utils/log_streamer.py @@ -0,0 +1,26 @@ +import asyncio +import aiofiles +import json +from pathlib import Path + +LOG_FILE = Path.home() / ".goose" / "session_log.json" + +async def stream_log(): + if not LOG_FILE.exists(): + print("No active Goose session log found.") + return + + async with aiofiles.open(LOG_FILE, mode='r') as f: + while True: + line = await f.readline() + if not line: + await asyncio.sleep(0.1) + continue + try: + log_entry = json.loads(line) + print(f"{log_entry['timestamp']} - {log_entry['action']}: {log_entry['details']}") + except json.JSONDecodeError: + print(f"Error decoding log entry: {line}") + +def start_log_stream(): + asyncio.run(stream_log()) \ No newline at end of file diff --git a/src/goose_plugins/utils/selenium_web_browser.py b/src/goose_plugins/utils/selenium_web_browser.py new file mode 100644 index 0000000..2a115e6 --- /dev/null +++ b/src/goose_plugins/utils/selenium_web_browser.py @@ -0,0 +1,54 @@ +from selenium import webdriver +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions +from selenium.webdriver.common.by import By +from webdriver_manager.chrome import ChromeDriverManager +import time + + +def chrome_driver_path() -> str: + return "/opt/homebrew/bin/chromedriver" + + +def get_web_page_content(url: str, wait_time: int = 10) -> str: + # Set up Chrome options + chrome_options = Options() + chrome_options.add_argument("--headless") # Run in headless mode (no GUI) + chrome_options.add_argument("--disable-gpu") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + + # Set up the Chrome driver + service = Service(ChromeDriverManager().install()) + driver = webdriver.Chrome(service=service, options=chrome_options) + + try: + # Navigate to the URL + driver.get(url) + + # Wait for the page to load + WebDriverWait(driver, wait_time).until(expected_conditions.presence_of_element_located((By.TAG_NAME, "body"))) + + # Allow some time for JavaScript to execute + time.sleep(wait_time) + + # Get the page source + # page_source = driver.page_source + + # Get the text content + text_content = driver.find_element(By.TAG_NAME, "body").text + + return text_content + + finally: + # Close the browser + driver.quit() + + +if __name__ == "__main__": + url = "https://www.google.com" + content = get_web_page_content(url) + print(content) + print("...") diff --git a/src/goose_plugins/utils/serper_search.py b/src/goose_plugins/utils/serper_search.py new file mode 100644 index 0000000..023836e --- /dev/null +++ b/src/goose_plugins/utils/serper_search.py @@ -0,0 +1,10 @@ +import os +import json +import requests + + +def serper_search(query: str) -> str: + payload = json.dumps({"q": query}) + headers = {"X-API-KEY": os.getenv("SERPER_API_KEY"), "Content-Type": "application/json"} + response = requests.request("POST", "https://google.serper.dev/search", headers=headers, data=payload) + return response.text diff --git a/tests/toolkits/.DS_Store b/tests/toolkits/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/tests/toolkits/.DS_Store differ diff --git a/tests/toolkits/test_critical_systems_thinking.py b/tests/toolkits/test_critical_systems_thinking.py new file mode 100644 index 0000000..a3d7001 --- /dev/null +++ b/tests/toolkits/test_critical_systems_thinking.py @@ -0,0 +1,146 @@ +import pytest +from unittest.mock import MagicMock, Mock, patch +from exchange import Exchange, Text +from exchange.content import Content +from typing import Any +from goose_plugins.toolkits.critical_systems_thinking import CriticalSystemsThinking + + +@pytest.fixture +def critical_systems_thinking() -> CriticalSystemsThinking: + notifier: MagicMock = MagicMock() + toolkit: CriticalSystemsThinking = CriticalSystemsThinking(notifier=notifier) + toolkit.exchange_view = Mock() + toolkit.exchange_view.processor = Mock() + toolkit.exchange_view.processor.messages = [] + return toolkit + + +def test_message_content_with_text(critical_systems_thinking: CriticalSystemsThinking) -> None: + text: Text = Text("test message") + result: Text = critical_systems_thinking.message_content(text) + + assert result == text + + +def test_message_content_with_other_content(critical_systems_thinking: CriticalSystemsThinking) -> None: + content: Mock = Mock(spec=Content) + content.__str__ = Mock(return_value="test content") + + result: Text = critical_systems_thinking.message_content(content) + + assert isinstance(result, Text) + assert result.text == "test content" + + +@patch("goose_plugins.toolkits.tools.search_tools.serper_search") +def test_search(mock_serper_search: Mock, critical_systems_thinking: CriticalSystemsThinking) -> None: + expected_response: str = '{"results": []}' + mock_serper_search.return_value = expected_response + query: str = "test query" + + result: str = critical_systems_thinking.search(query) + + assert result == expected_response + mock_serper_search.assert_called_once_with(query) + critical_systems_thinking.notifier.status.assert_called_once_with("searching...") + + +@patch("goose_plugins.toolkits.critical_systems_thinking.AnthropicProvider") +def test_analyze_request(mock_provider_class: Mock, critical_systems_thinking: CriticalSystemsThinking) -> None: + expected_analysis: str = "Analysis result" + mock_provider: Mock = Mock() + mock_provider_class.from_env.return_value = mock_provider + mock_response: Mock = Mock() + mock_response.content = [Text(expected_analysis)] + + with patch("goose_plugins.toolkits.critical_systems_thinking.ask_an_ai") as mock_ask: + mock_ask.return_value = mock_response + statement: str = "test statement" + + result: str = critical_systems_thinking.analyze_request(statement) + + assert result == expected_analysis + critical_systems_thinking.notifier.status.assert_called_with("analyzing request...") + + call_args: Any = mock_ask.call_args + assert statement in call_args[1]["input"] + assert isinstance(call_args[1]["exchange"], Exchange) + + +@patch("goose_plugins.toolkits.tools.search_tools.get_web_page_content") +@patch("goose_plugins.toolkits.critical_systems_thinking.AnthropicProvider") +def test_review_web_page( + mock_provider_class: Mock, mock_get_content: Mock, critical_systems_thinking: CriticalSystemsThinking +) -> None: + web_content: str = "Sample web content" + expected_summary: str = "Page summary" + url: str = "http://example.com" + + mock_get_content.return_value = web_content + mock_provider: Mock = Mock() + mock_provider_class.from_env.return_value = mock_provider + mock_response: Mock = Mock() + mock_response.content = [Text(expected_summary)] + + with patch("goose_plugins.toolkits.critical_systems_thinking.ask_an_ai") as mock_ask: + mock_ask.return_value = mock_response + + result: str = critical_systems_thinking.review_web_page(url) + + assert result == expected_summary + mock_get_content.assert_called_once_with(url) + critical_systems_thinking.notifier.status.assert_any_call(f"fetching content from {url}") + critical_systems_thinking.notifier.status.assert_any_call(f"reviewing content: {web_content[:50]}...") + + +@patch("goose_plugins.toolkits.tools.search_tools.get_web_page_content") +def test_review_web_page_error(mock_get_content: Mock, critical_systems_thinking: CriticalSystemsThinking) -> None: + error_message: str = "Failed to fetch" + url: str = "http://example.com" + mock_get_content.side_effect = Exception(error_message) + + result: str = critical_systems_thinking.review_web_page(url) + + assert result.startswith("Error:") + mock_get_content.assert_called_once_with(url) + + +@patch("goose_plugins.toolkits.critical_systems_thinking.AnthropicProvider") +def test_consider_solutions(mock_provider_class: Mock, critical_systems_thinking: CriticalSystemsThinking) -> None: + expected_solution: str = "Solution analysis" + mock_provider: Mock = Mock() + mock_provider_class.from_env.return_value = mock_provider + mock_response: Mock = Mock() + mock_response.content = [Text(expected_solution)] + + with patch("goose_plugins.toolkits.critical_systems_thinking.ask_an_ai") as mock_ask: + mock_ask.return_value = mock_response + statement: str = "test problem" + + result: str = critical_systems_thinking.consider_solutions(statement) + + assert result == expected_solution + critical_systems_thinking.notifier.status.assert_called_with("considering solutions...") + + call_args: Any = mock_ask.call_args + assert statement in call_args[1]["input"] + assert isinstance(call_args[1]["exchange"], Exchange) + + +def test_system_prompt(critical_systems_thinking: CriticalSystemsThinking) -> None: + expected_prompt: str = "System prompt content" + + with patch("exchange.Message.load") as mock_load: + mock_message: Mock = Mock() + mock_message.text = expected_prompt + mock_load.return_value = mock_message + + result: str = critical_systems_thinking.system_prompt() + + assert result == expected_prompt + mock_load.assert_called_once_with("prompts/critical_systems_thinking.jinja") + + +if __name__ == "__main__": + pytest.main(["-v"])