From 6fe37fc9bab348acaaa161d0977b2e5bc544faad Mon Sep 17 00:00:00 2001 From: David <3dgiordano@gmail.com> Date: Thu, 6 Nov 2025 17:14:47 -0300 Subject: [PATCH 1/4] =?UTF-8?q?Code=20Improvements=20and=20allow=20manager?= =?UTF-8?q?s=20without=20=C2=B4args=C2=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/perfecto.py | 20 +-- formatters/help.py | 3 +- tools/device_manager.py | 5 +- tools/execution_manager.py | 5 +- tools/help_manager.py | 8 +- tools/help_utils.py | 303 ++++++++++++++++++++++++++++++++++ tools/user_manager.py | 5 +- tools/utils.py | 321 +++---------------------------------- 8 files changed, 343 insertions(+), 327 deletions(-) create mode 100644 tools/help_utils.py diff --git a/config/perfecto.py b/config/perfecto.py index f3c53a5..07f837b 100644 --- a/config/perfecto.py +++ b/config/perfecto.py @@ -1,7 +1,3 @@ -import base64 -from pathlib import Path -from importlib import resources - TOOLS_PREFIX: str = "perfecto" WEBSITE: str = "https://github.com/PerfectoCore/perfecto-mcp/" GITHUB: str = "https://github.com/PerfectoCore/perfecto-mcp" @@ -18,18 +14,6 @@ HELP_INDEX_URL = f"{HELP_TOC_URL}perfecto_help.js" HELP_BASE_CONTENT_URL = "https://help.perfecto.io/perfecto-help/content/" -def get_mcp_icon_uri(): - name = "app.png" - try: - icon_path = resources.files("../resources").joinpath(name) - # print(icon_path) - except Exception: - icon_path = (Path(__file__).parent.parent / "resources" / name) - # print(icon_path) - - # icon_path = Path(__file__).parent.parent / "app.png" - icon_data = base64.standard_b64encode(icon_path.read_bytes()).decode() - return f"data:image/png;base64,{icon_data}" def get_tenant_management_api_url(cloud_name: str) -> str: return f"https://{cloud_name}.app.perfectomobile.com/tenant-management-webapp/rest/v1/tenant-management/tenants/current" @@ -70,8 +54,10 @@ def get_virtual_device_management_api_url(cloud_name: str) -> str: def get_web_desktop_management_api_url(cloud_name: str) -> str: return f"https://{cloud_name}.perfectomobile.com/web/api/v1/config/devices" + def get_real_devices_extended_commands_help_url() -> str: return "https://help.perfecto.io/perfecto-help/content/perfecto/automation-testing/perfecto_extensions.htm" + def get_real_devices_extended_command_base_help_url() -> str: - return "https://help.perfecto.io/perfecto-help/content/perfecto/automation-testing/" \ No newline at end of file + return "https://help.perfecto.io/perfecto-help/content/perfecto/automation-testing/" diff --git a/formatters/help.py b/formatters/help.py index b10c118..2702776 100644 --- a/formatters/help.py +++ b/formatters/help.py @@ -2,7 +2,8 @@ from lxml import html -from tools.utils import html_to_markdown +from tools.help_utils import html_to_markdown + def format_help_info(html_content: str, params: Optional[dict] = None) -> dict[str, Any]: base_url = params.get("base_url") diff --git a/tools/device_manager.py b/tools/device_manager.py index 6f4254f..d037de0 100644 --- a/tools/device_manager.py +++ b/tools/device_manager.py @@ -77,10 +77,11 @@ def register(mcp, token: Optional[PerfectoToken]): ) async def devices( action: str = Field(description="The action id to execute"), - args: Dict[str, Any] = Field(description="Dictionary with parameters"), + args: Dict[str, Any] = Field(description="Dictionary with parameters", default=None), ctx: Context = Field(description="Context object providing access to MCP capabilities") ) -> BaseResult: - + if args is None: + args = {} device_manager = DeviceManager(token, ctx) try: match action: diff --git a/tools/execution_manager.py b/tools/execution_manager.py index 24d1355..f720530 100644 --- a/tools/execution_manager.py +++ b/tools/execution_manager.py @@ -224,10 +224,11 @@ def register(mcp, token: Optional[PerfectoToken]): ) async def execution( action: str = Field(description="The action id to execute"), - args: Dict[str, Any] = Field(description="Dictionary with parameters"), + args: Dict[str, Any] = Field(description="Dictionary with parameters", default=None), ctx: Context = Field(description="Context object providing access to MCP capabilities") ) -> BaseResult: - + if args is None: + args = {} execution_manager = ExecutionManager(token, ctx) try: match action: diff --git a/tools/help_manager.py b/tools/help_manager.py index 551664b..a4f39a1 100644 --- a/tools/help_manager.py +++ b/tools/help_manager.py @@ -15,7 +15,8 @@ format_read_real_devices_extended_command_info, format_help_info from models.manager import Manager from models.result import BaseResult -from tools.utils import http_request, convert_js_to_py_dict +from tools.help_utils import convert_js_to_py_dict +from tools.utils import http_request class HelpManager(Manager): @@ -242,10 +243,11 @@ def register(mcp, token: Optional[PerfectoToken]): ) async def help_main( action: str = Field(description="The action id to execute"), - args: Dict[str, Any] = Field(description="Dictionary with parameters"), + args: Dict[str, Any] = Field(description="Dictionary with parameters", default=None), ctx: Context = Field(description="Context object providing access to MCP capabilities") ) -> BaseResult: - + if args is None: + args = {} help_manager = HelpManager(token, ctx) try: match action: diff --git a/tools/help_utils.py b/tools/help_utils.py new file mode 100644 index 0000000..619a1a2 --- /dev/null +++ b/tools/help_utils.py @@ -0,0 +1,303 @@ +import json +import re +from urllib.parse import urljoin + +import lxml.html + + +def clean_text(text, preserve_newlines=False): + text = text.replace('\xa0', ' ') + + if preserve_newlines: + lines = text.split('\n') + cleaned_lines = [' '.join(line.split()) for line in lines] + return '\n'.join(cleaned_lines).strip() + else: + text = ' '.join(text.split()) + return text.strip() + + +def extract_text_with_br(element): + html_str = lxml.html.tostring(element, encoding='unicode', method='html') + html_str = html_str.replace('
', '\n').replace('
', '\n').replace('
', '\n') + temp = lxml.html.fromstring(html_str) + return temp.text_content() + + +def table_to_markdown(table, base_url=None, as_html=True): + rows = table.xpath('.//tr') + if not rows: + return "" + + markdown = [] + + if as_html: + markdown.append("") + + header_row = table.xpath('.//thead//tr[1] | .//tr[1]') + if header_row: + headers = [] + for th in header_row[0].xpath('.//th | .//td'): + header_text = process_inline_elements(th, base_url, as_html) + header_text = clean_text(header_text) + headers.append(header_text) + + if headers and any(h for h in headers if h): + if as_html: + header_html = "" + for header in headers: + header_html += "" + header_html = "" + header_html + "" + markdown.append(header_html) + else: + markdown.append("| " + " | ".join(headers) + " |") + markdown.append("| " + " | ".join(["---"] * len(headers)) + " |") + start_idx = 1 + else: + start_idx = 0 + else: + start_idx = 0 + + if as_html: + markdown.append("") + + for row in rows[start_idx:]: + if as_html: + markdown.append("") + cells = row.xpath('.//td | .//th') + if cells: + cell_texts = [] + for cell in cells: + cell_text = process_inline_elements(cell, base_url, as_html) + cell_text = clean_text(cell_text) + cell_texts.append(cell_text) + + if any(cell_texts): + if as_html: + cells_html = "" + for cell in cell_texts: + cells_html += "" + markdown.append(cells_html) + else: + markdown.append("| " + " | ".join(cell_texts) + " |") + if as_html: + markdown.append("") + + if as_html: + markdown.append("
" + header + "
" + cell.replace("\n", "
") + "
") + + return "\n".join(markdown) if markdown else "" + + +def process_inline_elements(element, base_url=None, as_html=False): + parts = [] + + if element.text: + parts.append(element.text) + + for child in element: + tag = child.tag.lower() + + if tag == 'a': + href = child.get('href', '') + text = child.text_content().strip() + + if href and base_url: + href = urljoin(base_url, href) + + if text.lower() in ['copy', 'link', ''] or 'javascript:' in href: + if child.tail: + parts.append(child.tail) + continue + + if text and href: + if as_html: + parts.append(f"{text}") + else: + parts.append(f"[{text}]({href})") + elif text: + parts.append(text) + + elif tag == 'br': + if as_html: + parts.append('
') + else: + parts.append('\n') + elif tag in ['strong', 'b']: + text = child.text_content().strip() + if text: + if as_html: + parts.append(f"{text}") + else: + parts.append(f"**{text}**") + elif tag in ['em', 'i']: + text = child.text_content().strip() + if text: + if as_html: + parts.append(f"{text}") + else: + parts.append(f"*{text}*") + elif tag == 'code': + text = child.text_content().strip() + if text: + if as_html: + parts.append(f"{text}") + else: + parts.append(f"`{text}`") + else: + inner_result = process_inline_elements(child, base_url) + if inner_result: + parts.append(inner_result) + + if child.tail: + parts.append(child.tail) + + return ''.join(parts) + + +def element_to_markdown(element, base_url=None, level=0): + code_block_lang = ['javascript', 'java', 'python', 'ruby', 'go', 'php', 'c#', 'csharp', 'typescript', + 'bash', 'shell', 'sql', 'json', 'xml', 'yaml', 'css', 'html'] + + tag = element.tag.lower() + result = [] + + # Headers + if tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: + level_num = int(tag[1]) + text = clean_text(element.text_content()) + if text: + result.append(f"{'#' * level_num} {text}\n") + + elif tag == 'p': + text = clean_text(process_inline_elements(element, base_url)) + if text: + result.append(f"{text}\n") + + elif tag == 'ul': + for li in element.xpath('./li'): + text = clean_text(process_inline_elements(li, base_url)) + if text: + result.append(f"- {text}\n") + result.append("") + + elif tag == 'ol': + for i, li in enumerate(element.xpath('./li'), 1): + text = clean_text(process_inline_elements(li, base_url)) + if text: + result.append(f"{i}. {text}\n") + result.append("") + + elif tag == 'table': + table_md = table_to_markdown(element, base_url) + if table_md: + result.append(f"{table_md}\n") + + elif tag == 'pre' or 'codesnippet' in element.get('class', '').lower(): + lang = "" + code_element = element + + # Try to get the language code and the source code + + lang_elem = element.xpath('.//*[contains(@class, "language")]') + if not lang_elem: + for child in element: + child_text = child.text_content().strip().lower() + if child_text in code_block_lang: + lang = child_text + break + + code_children = element.xpath('.//code') + if code_children: + code_element = code_children[0] + class_attr = code_element.get('class', '') + if 'language-' in class_attr: + lang = class_attr.split('language-')[1].split()[0] + + code_text = extract_text_with_br(code_element) + code_text = clean_text(code_text, preserve_newlines=True) + + lines = code_text.split('\n') + filtered_lines = [] + + for i, line in enumerate(lines): + line_stripped = line.strip().lower() + if line_stripped == 'copy': # Exclude Copy element (UI Element) + continue + if line_stripped in code_block_lang: + lang = line_stripped + + filtered_lines.append(line) + + code_text = '\n'.join(filtered_lines).strip() + + if code_text: + result.append(f"```{lang}\n{code_text}\n```\n") + + elif tag == 'blockquote': + text = extract_text_with_br(element) + text = clean_text(text) + if text: + for line in text.split('\n'): + if line.strip(): + result.append(f"> {line}\n") + result.append("") + + elif tag == 'hr': + result.append("---\n") + + elif tag == 'img': + alt = element.get('alt', '') + src = element.get('src', '') + if src and base_url: + src = urljoin(base_url, src) + result.append(f"![{alt}]({src})\n") + + elif tag in ['script', 'style', 'noscript', 'meta', 'link', 'head']: + # Ignore + pass + + # For any others elements, process the children + else: + for child in element: + child_md = element_to_markdown(child, base_url, level + 1) + if child_md: + result.extend(child_md) + + return result + + +def html_to_markdown(html_content, base_url=None): + tree = lxml.html.fromstring(html_content) + + main_div = tree.xpath('//div[@role="main"]') + if not main_div: + main_div = tree.xpath('//main | //div[@class="main"] | //article | //body') + + if not main_div: + return "# Error\n\nMain content not found" + + main_div = main_div[0] + + markdown_lines = [] + for child in main_div: + result = element_to_markdown(child, base_url) + if result: + markdown_lines.extend(result) + + markdown = "".join(markdown_lines) + + while "\n\n\n" in markdown: + markdown = markdown.replace("\n\n\n", "\n\n") + + return markdown.strip() + + +def convert_js_to_py_dict(js_text: str) -> dict: + # Convert javascript dictionary to python dictionary + js_text = js_text.replace("define(", "").replace(");", "").replace("'", '"') + js_text = re.sub(r'//.*', '', js_text) + js_text = re.sub(r'/\*[\s\S]*?\*/', '', js_text) + js_text = re.sub(r',\s*(?=[}\]])', '', js_text) + js_text = re.sub(r'([{\[,]\s*)([A-Za-z_][A-Za-z0-9_]*)\s*:', r'\1"\2":', js_text) + return json.loads(js_text) diff --git a/tools/user_manager.py b/tools/user_manager.py index 7465dca..f6f816d 100644 --- a/tools/user_manager.py +++ b/tools/user_manager.py @@ -36,10 +36,11 @@ def register(mcp, token: Optional[PerfectoToken]): ) async def user( action: str = Field(description="The action id to execute"), - args: Dict[str, Any] = Field(description="Dictionary with parameters"), + args: Dict[str, Any] = Field(description="Dictionary with parameters", default=None), ctx: Context = Field(description="Context object providing access to MCP capabilities") ) -> BaseResult: - + if args is None: + args = {} user_manager = UserManager(token, ctx) try: match action: diff --git a/tools/utils.py b/tools/utils.py index 8920fb4..f29a3ae 100644 --- a/tools/utils.py +++ b/tools/utils.py @@ -1,15 +1,16 @@ """ Simple utilities for Perfecto MCP tools. """ -import json +import base64 +import os import platform -import re +import sys from datetime import datetime +from importlib import resources +from pathlib import Path from typing import Optional, Callable -from urllib.parse import urljoin import httpx -import lxml.html from config.token import PerfectoToken from config.version import __version__ @@ -106,300 +107,20 @@ def get_date_time_iso(timestamp: int) -> Optional[str]: else: return datetime.fromtimestamp(timestamp).isoformat() - -def clean_text(text, preserve_newlines=False): - text = text.replace('\xa0', ' ') - - if preserve_newlines: - lines = text.split('\n') - cleaned_lines = [' '.join(line.split()) for line in lines] - return '\n'.join(cleaned_lines).strip() - else: - text = ' '.join(text.split()) - return text.strip() - - -def extract_text_with_br(element): - html_str = lxml.html.tostring(element, encoding='unicode', method='html') - html_str = html_str.replace('
', '\n').replace('
', '\n').replace('
', '\n') - temp = lxml.html.fromstring(html_str) - return temp.text_content() - - -def table_to_markdown(table, base_url=None, as_html=True): - rows = table.xpath('.//tr') - if not rows: - return "" - - markdown = [] - - if as_html: - markdown.append("") - - header_row = table.xpath('.//thead//tr[1] | .//tr[1]') - if header_row: - headers = [] - for th in header_row[0].xpath('.//th | .//td'): - header_text = process_inline_elements(th, base_url, as_html) - header_text = clean_text(header_text) - headers.append(header_text) - - if headers and any(h for h in headers if h): - if as_html: - header_html = "" - for header in headers: - header_html += "" - header_html = "" + header_html + "" - markdown.append(header_html) - else: - markdown.append("| " + " | ".join(headers) + " |") - markdown.append("| " + " | ".join(["---"] * len(headers)) + " |") - start_idx = 1 - else: - start_idx = 0 - else: - start_idx = 0 - - if as_html: - markdown.append("") - - for row in rows[start_idx:]: - if as_html: - markdown.append("") - cells = row.xpath('.//td | .//th') - if cells: - cell_texts = [] - for cell in cells: - cell_text = process_inline_elements(cell, base_url, as_html) - cell_text = clean_text(cell_text) - cell_texts.append(cell_text) - - if any(cell_texts): - if as_html: - cells_html = "" - for cell in cell_texts: - cells_html += "" - markdown.append(cells_html) - else: - markdown.append("| " + " | ".join(cell_texts) + " |") - if as_html: - markdown.append("") - - if as_html: - markdown.append("
" + header + "
" + cell.replace("\n", "
") + "
") - - return "\n".join(markdown) if markdown else "" - - -def process_inline_elements(element, base_url=None, as_html=False): - parts = [] - - if element.text: - parts.append(element.text) - - for child in element: - tag = child.tag.lower() - - if tag == 'a': - href = child.get('href', '') - text = child.text_content().strip() - - if href and base_url: - href = urljoin(base_url, href) - - if text.lower() in ['copy', 'link', ''] or 'javascript:' in href: - if child.tail: - parts.append(child.tail) - continue - - if text and href: - if as_html: - parts.append(f"{text}") - else: - parts.append(f"[{text}]({href})") - elif text: - parts.append(text) - - elif tag == 'br': - if as_html: - parts.append('
') - else: - parts.append('\n') - elif tag in ['strong', 'b']: - text = child.text_content().strip() - if text: - if as_html: - parts.append(f"{text}") - else: - parts.append(f"**{text}**") - elif tag in ['em', 'i']: - text = child.text_content().strip() - if text: - if as_html: - parts.append(f"{text}") - else: - parts.append(f"*{text}*") - elif tag == 'code': - text = child.text_content().strip() - if text: - if as_html: - parts.append(f"{text}") - else: - parts.append(f"`{text}`") +def get_resources_path(): + try: + resources_path = resources.files("resources") + except ModuleNotFoundError: + # Fallback for development or if not installed as package + if getattr(sys, 'frozen', False): + base_path = sys._MEIPASS else: - inner_result = process_inline_elements(child, base_url) - if inner_result: - parts.append(inner_result) - - if child.tail: - parts.append(child.tail) - - return ''.join(parts) - - -def element_to_markdown(element, base_url=None, level=0): - code_block_lang = ['javascript', 'java', 'python', 'ruby', 'go', 'php', 'c#', 'csharp', 'typescript', - 'bash', 'shell', 'sql', 'json', 'xml', 'yaml', 'css', 'html'] - - tag = element.tag.lower() - result = [] - - # Headers - if tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']: - level_num = int(tag[1]) - text = clean_text(element.text_content()) - if text: - result.append(f"{'#' * level_num} {text}\n") - - elif tag == 'p': - text = clean_text(process_inline_elements(element, base_url)) - if text: - result.append(f"{text}\n") - - elif tag == 'ul': - for li in element.xpath('./li'): - text = clean_text(process_inline_elements(li, base_url)) - if text: - result.append(f"- {text}\n") - result.append("") - - elif tag == 'ol': - for i, li in enumerate(element.xpath('./li'), 1): - text = clean_text(process_inline_elements(li, base_url)) - if text: - result.append(f"{i}. {text}\n") - result.append("") - - elif tag == 'table': - table_md = table_to_markdown(element, base_url) - if table_md: - result.append(f"{table_md}\n") - - elif tag == 'pre' or 'codesnippet' in element.get('class', '').lower(): - lang = "" - code_element = element - - # Try to get the language code and the source code - - lang_elem = element.xpath('.//*[contains(@class, "language")]') - if not lang_elem: - for child in element: - child_text = child.text_content().strip().lower() - if child_text in code_block_lang: - lang = child_text - break - - code_children = element.xpath('.//code') - if code_children: - code_element = code_children[0] - class_attr = code_element.get('class', '') - if 'language-' in class_attr: - lang = class_attr.split('language-')[1].split()[0] - - code_text = extract_text_with_br(code_element) - code_text = clean_text(code_text, preserve_newlines=True) - - lines = code_text.split('\n') - filtered_lines = [] - - for i, line in enumerate(lines): - line_stripped = line.strip().lower() - if line_stripped == 'copy': # Exclude Copy element (UI Element) - continue - if line_stripped in code_block_lang: - lang = line_stripped - - filtered_lines.append(line) - - code_text = '\n'.join(filtered_lines).strip() - - if code_text: - result.append(f"```{lang}\n{code_text}\n```\n") - - elif tag == 'blockquote': - text = extract_text_with_br(element) - text = clean_text(text) - if text: - for line in text.split('\n'): - if line.strip(): - result.append(f"> {line}\n") - result.append("") - - elif tag == 'hr': - result.append("---\n") - - elif tag == 'img': - alt = element.get('alt', '') - src = element.get('src', '') - if src and base_url: - src = urljoin(base_url, src) - result.append(f"![{alt}]({src})\n") - - elif tag in ['script', 'style', 'noscript', 'meta', 'link', 'head']: - # Ignore - pass - - # For any others elements, process the children - else: - for child in element: - child_md = element_to_markdown(child, base_url, level + 1) - if child_md: - result.extend(child_md) - - return result - - -def html_to_markdown(html_content, base_url=None): - tree = lxml.html.fromstring(html_content) - - main_div = tree.xpath('//div[@role="main"]') - if not main_div: - main_div = tree.xpath('//main | //div[@class="main"] | //article | //body') - - if not main_div: - return "# Error\n\nMain content not found" - - main_div = main_div[0] - - markdown_lines = [] - for child in main_div: - result = element_to_markdown(child, base_url) - if result: - markdown_lines.extend(result) - - markdown = "".join(markdown_lines) - - while "\n\n\n" in markdown: - markdown = markdown.replace("\n\n\n", "\n\n") - - return markdown.strip() - - -def convert_js_to_py_dict(js_text: str) -> dict: - # Convert javascript dictionary to python dictionary - js_text = js_text.replace("define(", "").replace(");", "").replace("'", '"') - js_text = re.sub(r'//.*', '', js_text) - js_text = re.sub(r'/\*[\s\S]*?\*/', '', js_text) - js_text = re.sub(r',\s*(?=[}\]])', '', js_text) - js_text = re.sub(r'([{\[,]\s*)([A-Za-z_][A-Za-z0-9_]*)\s*:', r'\1"\2":', js_text) - return json.loads(js_text) + base_path = os.path.dirname(os.path.abspath(__file__)) + resources_path = Path(base_path) / 'resources' + return resources_path + +def get_mcp_icon_uri(): + name = "app.png" + icon_path = get_resources_path().joinpath(name) + icon_data = base64.standard_b64encode(icon_path.read_bytes()).decode() + return f"data:image/png;base64,{icon_data}" From a45623927c4161e471fc4c72df4a06eb4cd5b98b Mon Sep 17 00:00:00 2001 From: David <3dgiordano@gmail.com> Date: Thu, 6 Nov 2025 19:23:01 -0300 Subject: [PATCH 2/4] Skills Tool --- pyproject.toml | 2 +- server.py | 2 + tools/skills_manager.py | 150 +++++++++++++++++++++++++++ tools/skills_utils.py | 225 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 378 insertions(+), 1 deletion(-) create mode 100644 tools/skills_manager.py create mode 100644 tools/skills_utils.py diff --git a/pyproject.toml b/pyproject.toml index a45dc3d..8df648d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,4 +26,4 @@ where = ["."] include = ["tools", "config", "models", "formatters", "resources"] [tool.setuptools.package-data] -"resources" = ["*.png"] +"resources" = ["**/*"] diff --git a/server.py b/server.py index 8010c8f..92dd9a4 100644 --- a/server.py +++ b/server.py @@ -5,6 +5,7 @@ from tools.device_manager import register as register_device_manager from tools.execution_manager import register as register_execution_manager from tools.help_manager import register as register_help_manager +from tools.skills_manager import register as register_skills_manager def register_tools(mcp, token: Optional[PerfectoToken]): """ @@ -18,3 +19,4 @@ def register_tools(mcp, token: Optional[PerfectoToken]): register_device_manager(mcp, token) register_execution_manager(mcp, token) register_help_manager(mcp, token) + register_skills_manager(mcp, token) diff --git a/tools/skills_manager.py b/tools/skills_manager.py new file mode 100644 index 0000000..f9f0106 --- /dev/null +++ b/tools/skills_manager.py @@ -0,0 +1,150 @@ +import traceback +from typing import Optional, Dict, Any + +import httpx +from mcp.server.fastmcp import Context +from pydantic import Field + +from config.perfecto import TOOLS_PREFIX, SUPPORT_MESSAGE +from config.token import PerfectoToken +from models.manager import Manager +from models.result import BaseResult +from tools.skills_utils import list_skills, read_skill_definition, read_skill_file, parse_skill_uri, \ + is_skill_uri, list_skill_resources_uri + + +# This it's based on the ideas behind Anthropic Skills +# More info about Skills https://github.com/anthropics/skills + +class SkillsManager(Manager): + skills = None # Static to share between different instance of HelpManager + + def __init__(self, token: Optional[PerfectoToken], ctx: Context): + super().__init__(token, ctx) + + @staticmethod + async def list_skills() -> BaseResult: + errors = [] + if SkillsManager.skills is None: + skills, errors = list_skills() + SkillsManager.skills = skills + + return BaseResult( + result=SkillsManager.skills, + error=errors[0] if errors and len(errors) > 0 else None # Only the first error + ) + + @staticmethod + async def read_skill(skill_name: str) -> BaseResult: + skill_content, error = read_skill_definition(skill_name) + return BaseResult( + result={ + "skill_name": skill_name, + "path": "SKILL.md", + "content": skill_content, + }, + error=error + ) + + @staticmethod + async def read_skill_file_path(skill_name: str, file_path: str) -> BaseResult: + skill_content, error = read_skill_file(skill_name, file_path) + return BaseResult( + result={ + "skill_name": skill_name, + "path": file_path, + "content": skill_content, + }, + error=error + ) + + @staticmethod + async def list_skill_resources(skill_name: str) -> BaseResult: + skill_resources = list_skill_resources_uri(skill_name) + return BaseResult( + result={ + "skill_name": skill_name, + "resources": skill_resources, + } + ) + + @staticmethod + async def read_skill_resource_uri(skill_uri: str) -> BaseResult: + if is_skill_uri(skill_uri): + skill_name, file_path = parse_skill_uri(skill_uri) + skill_content, error = read_skill_file(skill_name, file_path) + return BaseResult( + result={ + "skill_name": skill_name, + "path": file_path, + "content": skill_content, + }, + error=error + ) + else: + return BaseResult( + error=f"Invalid Skill URI: {skill_uri}" + ) + + +def register(mcp, token: Optional[PerfectoToken]): + @mcp.resource("skills-{skill_name}://{path}") + def universal_skills_handler(skill_name: str, path: str) -> BaseResult: + content, error = read_skill_file(skill_name, path) + return BaseResult( + result={ + "skill_name": skill_name, + "path": path, + "content": content, + }, + error=error + ) + + @mcp.tool( + name=f"{TOOLS_PREFIX}_skills", + description=""" +Operations to obtain Skills around Perfecto. +Actions: +- list_skills: List all the Skills available to learn. +- read_skill: Read detailed information about a specific skill_name. + args(dict): Dictionary with the following required parameters: + skill_name (str): The skill name. +- list_skill_resources: List all the Skills Resources available to learn. + args(dict): Dictionary with the following required parameters: + skill_name (str): The skill name. +- read_skill_resource_uri: Read file content based on a Skill Resource URI (skill-{skill_name}://{resource_path}). + args(dict): Dictionary with the following required parameters: + skill_resource_uri (str): The skill URI. + +""" + ) + async def skills( + action: str = Field(description="The action id to execute"), + args: Dict[str, Any] = Field(description="Dictionary with parameters", default=None), + ctx: Context = Field(description="Context object providing access to MCP capabilities") + ) -> BaseResult: + if args is None: + args = {} + skills_manager = SkillsManager(token, ctx) + try: + match action: + case "list_skills": + return await skills_manager.list_skills() + case "read_skill": + return await skills_manager.read_skill(args.get("skill_name", "")) + case "list_skill_resources": + return await skills_manager.list_skill_resources(args.get("skill_name", "")) + case "read_skill_resource_uri": + return await skills_manager.read_skill_resource_uri(args.get("skill_resource_uri", "")) + case _: + return BaseResult( + error=f"Action {action} not found in skills manager tool" + ) + except httpx.HTTPStatusError: + return BaseResult( + error=f"Error: {traceback.format_exc()}" + ) + except Exception: + return BaseResult( + error=f"Error: {traceback.format_exc()}\n{SUPPORT_MESSAGE}" + ) diff --git a/tools/skills_utils.py b/tools/skills_utils.py new file mode 100644 index 0000000..2e24700 --- /dev/null +++ b/tools/skills_utils.py @@ -0,0 +1,225 @@ +import os +import re +from pathlib import Path +from typing import Tuple, Dict, List + +from tools.utils import get_resources_path + +SKILL_URI_REGEX = re.compile(r"^skill-(?P[a-zA-Z0-9_-]+)://(?P.+)$") + + +def parse_frontmatter(frontmatter: str) -> Dict[str, str]: + meta = {} + lines = frontmatter.splitlines() + current_key = None + current_value = [] + block_mode = False + + for line in lines: + stripped = line.strip() + if not stripped or stripped.startswith('#'): + continue # Skip empty or comment lines + + if ':' in line and not line.startswith((' ', '\t')): # New key at root level + if current_key: + value_str = '\n'.join(current_value).strip() + value_str = str(value_str).strip('\'"') # Strip outer quotes if present + meta[current_key] = value_str + try: + key, value = [part.strip() for part in line.split(':', 1)] + value = str(value).strip('\'"') # Strip quotes from single-line value + except ValueError: + raise ValueError("Invalid key-value format in frontmatter") + + if key not in ('name', 'description'): + continue # Ignore unexpected keys + + if value in ('|', '>'): + block_mode = True + current_value = [] + else: + block_mode = False + current_value = [value] + + current_key = key + elif current_key and (block_mode or line.startswith((' ', '\t'))): + # Append to multi-line block (indented or in block mode) + current_value.append(line.rstrip() if block_mode else line.strip()) + else: + raise ValueError("Malformed frontmatter structure") + + if current_key: + value_str = '\n'.join(current_value).strip() + value_str = value_str.strip('\'"') # Strip outer quotes if present + meta[current_key] = value_str + + # Ensure required keys are present after parsing + if 'name' not in meta or 'description' not in meta: + raise ValueError("Missing 'name' or 'description' in frontmatter") + + return meta + + +def list_skills() -> Tuple[List[Dict], List[str]]: + full_skills_dir = os.path.join(get_resources_path(), 'skills') + skills = [] + errors = [] + skills_path = Path(full_skills_dir) + + if not os.path.exists(skills_path): + # Graceful handling + return [], [f"Missing skills directory: {skills_path}"] + + for folder in os.listdir(skills_path): + folder_path = skills_path / folder + if folder_path.is_dir(): + md_path = folder_path / 'SKILL.md' + if md_path.exists(): + skill, error = read_skill_meta(md_path) + if skill is not None: + skills.append(skill) + if error is not None: + errors.append(error) + else: + errors.append(f"Skill file not found: {md_path}") + else: + errors.append(f"Invalid Skill folder {folder_path}") + return skills, errors + + +def validate_skill_content(content, md_path): + if not content.startswith('---'): + return False, "No YAML frontmatter found" + + match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL) + if not match: + return False, "Invalid frontmatter format" + + frontmatter = match.group(1) + + if 'name:' not in frontmatter: + return False, "Missing 'name' in frontmatter" + if 'description:' not in frontmatter: + return False, "Missing 'description' in frontmatter" + + try: + meta = parse_frontmatter(frontmatter) + name = meta.get('name') + description = meta.get('description') + + if not name or not description: + return False, "Name or description is empty" + + # Validate name: hyphen-case + if not re.match(r'^[a-z0-9-]+$', name): + return False, f"Name '{name}' should be hyphen-case (lowercase letters, digits, and hyphens only)" + if name.startswith('-') or name.endswith('-') or '--' in name: + return False, f"Name '{name}' cannot start/end with hyphen or contain consecutive hyphens" + + # Validate description: no angle brackets + if '<' in description or '>' in description: + return False, "Description cannot contain angle brackets (< or >)" + + except ValueError as e: + return False, f"Error parsing frontmatter in {md_path}: {str(e)}" + + return True, "Skill is valid" + + +def read_skill_meta(md_path) -> Tuple[Dict[str, None], str | None]: + skill = None + error = None + content = md_path.read_text(encoding='utf-8') + valid, message = validate_skill_content(content, md_path) + if valid: + match = re.match(r'^---\n(.*?)\n---', content, re.DOTALL) + if match: + frontmatter = match.group(1) + try: + meta = parse_frontmatter(frontmatter) + name = meta.get('name') + description = meta.get('description') + if name and description: + skill = { + 'name': name, + 'description': description + } + except ValueError as e: + error = f"Error parsing frontmatter in {md_path}: {str(e)}" + else: + error = f"Validation failed for {md_path}: {message}" + return skill, error + + +def get_skill_file_path(skill_name: str, file_path: str) -> Path: + full_skills_dir = os.path.join(get_resources_path(), 'skills') + skills_path = Path(full_skills_dir) + return skills_path / skill_name / file_path + + +def replace_skills_markdown_links(content: str, skill_name: str, file_path: str) -> str: + # Markdown Link Pattern []() + pattern = re.compile(r'\[([^\]]+)\]\(([^)]+)\)') + + def replacer(match): + text, url = match.groups() + # Detect when not it's a protocol link + if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', url): + # Only when it's relative to local, generate the uri format + # Transform the relative path to absolute and cut the base path + skills_path = Path(os.path.join(get_resources_path(), 'skills')) + base_path = skills_path / skill_name / file_path + url = (base_path.parent / url).resolve() + try: + url = url.relative_to(skills_path / skill_name).as_posix() + new_url = f"skill-{skill_name}://{url}" + return f"[{text}]({new_url})" + except: + return match.group(0) # In case the relative not it's valid, use the original + else: + return match.group(0) # Other case return the original + + return pattern.sub(replacer, content) + + +def read_skill_file(skill_name: str, file_path: str) -> Tuple[str | None, str | None]: + skill_file_path = get_skill_file_path(skill_name, file_path) + if skill_file_path.exists(): + content = skill_file_path.read_text(encoding='utf-8') + if file_path.endswith('.md'): # Only on Markdown replace to skills uri format + content = replace_skills_markdown_links(content, skill_name, file_path) + if file_path.endswith('SKILL.md'): + valid, message = validate_skill_content(content, file_path) + if valid: + return content, None + else: + return None, f"Validation failed for {skill_name}: {message}" + else: + return content, None + else: + return None, f"Skill file not found: {skill_file_path}" + + +def is_skill_uri(uri: str) -> bool: + return bool(SKILL_URI_REGEX.match(uri)) + + +def parse_skill_uri(uri: str) -> Tuple[str, str]: + match = SKILL_URI_REGEX.match(uri) + if not match: + raise ValueError(f"Invalid Skill URI : {uri}") + return match.group("skill_name"), match.group("path") + +def list_skill_resources_uri(skill_name: str) -> List[str]: + full_skills_dir = os.path.join(get_resources_path(), 'skills') + skills_path = Path(full_skills_dir) + skill_path = skills_path / skill_name + skill_resources = [] + for file_path in skill_path.rglob("*"): + if file_path.is_file(): + url = file_path.relative_to(skills_path / skill_name).as_posix() + skill_resources.append(f"skill-{skill_name}://{url}") + return skill_resources + +def read_skill_definition(skill_name: str) -> Tuple[str | None, str | None]: + return read_skill_file(skill_name, 'SKILL.md') From 59e6070d4b280b53101c2dedbfcd2672c26bba5f Mon Sep 17 00:00:00 2001 From: David <3dgiordano@gmail.com> Date: Wed, 26 Nov 2025 13:31:35 -0300 Subject: [PATCH 3/4] Generalization of resource inclusion to include skills --- build.py | 2 +- main.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/build.py b/build.py index 1d27f88..3b7ea70 100644 --- a/build.py +++ b/build.py @@ -81,7 +81,7 @@ def build(): '--onefile', '--version-file=version_info.txt', f'--add-data=pyproject.toml{sep}.', - f'--add-data=resources/app.png{sep}resources', + f'--add-data=resources{sep}resources', f'--name={name}', f'--icon={icon}', '--clean', diff --git a/main.py b/main.py index 6e68418..676bb86 100644 --- a/main.py +++ b/main.py @@ -65,6 +65,14 @@ def run(log_level: str = "CRITICAL"): instructions = """ # Perfecto MCP Server +Use Perfecto MCP (Model Context Protocol) to interact with Perfecto cloud services. +Use this MCP Server when you need to query devices, retrieve execution reports, access help documentation, +manage scriptless tests, explore available skills, or get user information programmatically through the MCP interface. + +Use perfecto_skills tool to discover Perfecto skills. +Start with perfecto-mcp-tools skill to understand available MCP tools. +Then discover and read relevant skills based on user queries. + """ mcp = FastMCP("perfecto-mcp", instructions=instructions, From 6e40935b3cfa105e2ff7c0f3249feba851b56d24 Mon Sep 17 00:00:00 2001 From: David <3dgiordano@gmail.com> Date: Mon, 8 Dec 2025 14:12:00 -0300 Subject: [PATCH 4/4] Fix comment --- tools/skills_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/skills_manager.py b/tools/skills_manager.py index f9f0106..1fa7dec 100644 --- a/tools/skills_manager.py +++ b/tools/skills_manager.py @@ -17,7 +17,7 @@ # More info about Skills https://github.com/anthropics/skills class SkillsManager(Manager): - skills = None # Static to share between different instance of HelpManager + skills = None # Static to share between different instance of SkillsManager def __init__(self, token: Optional[PerfectoToken], ctx: Context): super().__init__(token, ctx)