From a367d57883fb5d605036f2fa63c1f230b2d9e4ef Mon Sep 17 00:00:00 2001 From: idelcano Date: Fri, 21 Nov 2025 18:16:51 +0100 Subject: [PATCH 1/6] added auto dependency vulnerabilities checker --- security-scanner/.gitignore | 2 + security-scanner/bom_tool.py | 127 ++++ security-scanner/dependency_track_tool.py | 581 ++++++++++++++++++ .../dependency-track/docker-compose.base.yml | 27 + .../dependency-track/docker-compose.trivy.yml | 18 + security-scanner/env_file.py | 85 +++ security-scanner/security_assistant.py | 321 ++++++++++ security-scanner/snyk_tool.py | 114 ++++ security-scanner/trivy_tool.py | 58 ++ security-scanner/utils.py | 114 ++++ 10 files changed, 1447 insertions(+) create mode 100644 security-scanner/.gitignore create mode 100644 security-scanner/bom_tool.py create mode 100644 security-scanner/dependency_track_tool.py create mode 100644 security-scanner/docker/dependency-track/docker-compose.base.yml create mode 100644 security-scanner/docker/dependency-track/docker-compose.trivy.yml create mode 100644 security-scanner/env_file.py create mode 100644 security-scanner/security_assistant.py create mode 100644 security-scanner/snyk_tool.py create mode 100644 security-scanner/trivy_tool.py create mode 100644 security-scanner/utils.py diff --git a/security-scanner/.gitignore b/security-scanner/.gitignore new file mode 100644 index 00000000..a16add54 --- /dev/null +++ b/security-scanner/.gitignore @@ -0,0 +1,2 @@ +.env +tmp diff --git a/security-scanner/bom_tool.py b/security-scanner/bom_tool.py new file mode 100644 index 00000000..3ebd76d3 --- /dev/null +++ b/security-scanner/bom_tool.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Optional + +from utils import ConsoleUtils + + +class BomTool: + """Generates a CycloneDX BOM (bom.json) for a Node-based project.""" + + def __init__(self, utils: ConsoleUtils) -> None: + self.u = utils + self.node_version: Optional[str] = None # Node version to use via nvm (optional) + + def run( + self, + app_root: str, + node_version: Optional[str] = None, + interactive: bool = True, + ) -> Optional[Path]: + """ + Orchestrates BOM generation: + - Ask (optional) Node.js version (via nvm) + - Run npm install + CycloneDX BOM generation + """ + self.u.print_header("Step 2 - CycloneDX BOM generation") + + if interactive and node_version is None: + self._ask_node_version() + else: + self.node_version = node_version + + return self._generate_bom(app_root) + + # ------------------------------------------------------------------ + # Node.js version (optional, via nvm) + # ------------------------------------------------------------------ + + def _ask_node_version(self) -> None: + self.u.print_step("Node.js version (optional)") + + print( + "By default this step will use whatever 'node' is in your PATH.\n" + "If you know a specific Node.js version (managed by nvm) that should be used\n" + "for this project (for example 20.19.0), you can enter it here.\n" + "If you later see 'npm WARN EBADENGINE Unsupported engine', you can cancel\n" + "with Ctrl+C and rerun this assistant with another Node version.\n" + ) + + version = input("Node.js version to use with nvm (leave empty to use current): ").strip() + self.node_version = version or None + if self.node_version: + print(f"Will try to use Node.js {self.node_version} via nvm.") + else: + print("Using current Node.js from PATH (no nvm override).") + + # ------------------------------------------------------------------ + # BOM generation (single shell command) + # ------------------------------------------------------------------ + + def _generate_bom(self, app_root: str) -> Optional[Path]: + """ + Generate a CycloneDX BOM (bom.json) using npm + npx in a single shell command. + This will: + - Print node -v + - Run npm install + - Run npx @cyclonedx/cyclonedx-npm to produce bom.json + Optionally, it will switch Node.js version using nvm first. + """ + + if not self.u.find_executable("npm"): + print("❌ npm is not available in PATH. Cannot generate BOM.") + return None + + print( + "\nℹ️ If you see 'npm WARN EBADENGINE Unsupported engine', it means the project\n" + " expects a different Node.js version (for example ^20.19.0).\n" + " In that case, you can cancel with Ctrl+C and rerun this assistant with\n" + " a different Node version when prompted.\n" + ) + + self.u.print_step("Installing dependencies and generating CycloneDX BOM (npm install + npx)") + + if self.node_version: + # Single shell command: + # - Load nvm (if available) + # - nvm install + # - nvm use + # - node -v + # - npm install + # - npx @cyclonedx/cyclonedx-npm ... + shell_cmd = ( + "source ~/.nvm/nvm.sh 2>/dev/null || " + "source /usr/share/nvm/nvm.sh 2>/dev/null || " + "source /usr/local/opt/nvm/nvm.sh 2>/dev/null || true; " + f"nvm install {self.node_version}; " + f"nvm use {self.node_version}; " + "node -v; " + "npm install; " + "npx @cyclonedx/cyclonedx-npm " + "--output-file bom.json " + "--output-format json" + ) + rc = self.u.run_command(["bash", "-lc", shell_cmd], cwd=app_root) + else: + # No nvm: use whatever Node.js is in PATH + shell_cmd = ( + "node -v; " + "npm install; " + "npx @cyclonedx/cyclonedx-npm " + "--output-file bom.json " + "--output-format json" + ) + rc = self.u.run_command(["bash", "-lc", shell_cmd], cwd=app_root) + + if rc != 0: + print("❌ Failed to generate bom.json (npm install or npx failed).") + return None + + bom_path = Path(app_root) / "bom.json" + if not bom_path.is_file(): + print("❌ bom.json not found after generation.") + return None + + print(f"✅ BOM generated at: {bom_path}") + return bom_path diff --git a/security-scanner/dependency_track_tool.py b/security-scanner/dependency_track_tool.py new file mode 100644 index 00000000..cb8b35f3 --- /dev/null +++ b/security-scanner/dependency_track_tool.py @@ -0,0 +1,581 @@ +from __future__ import annotations + +import os +from pathlib import Path +from typing import Optional +from urllib.parse import urlparse, urlunparse + +import requests + +from utils import ConsoleUtils +from env_file import EnvFile + + +class DependencyTrackTool: + """ + Handles Dependency-Track orchestration and BOM upload. + + It does NOT generate the BOM. It expects an existing bom.json path. + + Two main modes: + 1) Local mode: start Dependency-Track via docker compose on this machine. + 2) Remote mode: connect to an already running Dependency-Track instance. + """ + + def __init__(self, utils: ConsoleUtils) -> None: + self.u = utils + + # Default host ports for a local instance started via docker compose. + self.api_port: str = "8081" # host port for API server + self.ui_port: str = "8080" # host port for Web UI + + # Optional Trivy analyzer support when running a local stack. + self.enable_trivy_analyzer: bool = False + self.trivy_token: Optional[str] = None # Token used by the Trivy server (if enabled) + + # ------------------------------------------------------------------ + # Public entry point + # ------------------------------------------------------------------ + + def run( + self, + bom_path: Path, + project_name: str, + project_version: str, + enable_trivy_analyzer: bool = False, + api_port: str = "8081", + ui_port: str = "8080", + api_url: Optional[str] = None, + api_key: Optional[str] = None, + interactive: bool = True, + ) -> bool: + """ + Orchestrates Dependency-Track usage: + + - Ask whether to: + a) Start a local Dependency-Track instance with docker compose. + b) Connect to an already running Dependency-Track instance. + - Local mode: + - Ask ports + - Start docker-compose (with or without Trivy server) + - Use the local instance URL for the upload + - Remote mode: + - Ask for the Dependency-Track API base URL + - If api_url is provided, remote mode is assumed automatically + - Upload the given BOM to the chosen Dependency-Track instance. + - If interactive=False, rely entirely on the provided parameters / env vars. + """ + self.u.print_header("Step 4 - Dependency-Track (docker-compose / existing instance + upload)") + + if not bom_path.is_file(): + print(f"❌ BOM file not found: {bom_path}. Generate bom.json before using Dependency-Track.") + return False + + self.enable_trivy_analyzer = enable_trivy_analyzer + self.api_port = api_port or self.api_port + self.ui_port = ui_port or self.ui_port + + # Decide whether to start a local instance or connect to an existing one + script_dir = Path(__file__).resolve().parent + docker_dir = script_dir / "docker" / "dependency-track" + selected_mode = "local" + + if api_url: + print("Using Dependency-Track server provided via CLI argument.") + selected_mode = "remote" + elif interactive: + use_local_stack = self.u.ask_yes_no( + "Do you want this assistant to start Dependency-Track locally with Docker?", + default=True, + ) + selected_mode = "local" if use_local_stack else "remote" + else: + selected_mode = "local" + + if selected_mode == "local": + if not self._run_local_dependency_track(docker_dir, interactive=interactive): + return False + else: + if not self._configure_existing_dependency_track_url( + interactive=interactive, + provided_url=api_url, + ): + return False + + # Upload the BOM using whatever URL configuration we ended up with. + return self._upload_bom_to_dependency_track( + bom_path=bom_path, + project_name=project_name, + project_version=project_version, + api_url_override=api_url if not interactive else None, + api_key_override=api_key, + interactive=interactive, + ) + + # ------------------------------------------------------------------ + # Mode selection helpers + # ------------------------------------------------------------------ + + def _run_local_dependency_track(self, docker_dir: Path, interactive: bool) -> bool: + """ + Start a local Dependency-Track stack using docker compose. + + This will: + - Ask for host ports (API/UI). + - Optionally resolve a Trivy token and configure the Trivy server. + - Start docker compose in the provided docker_dir. + + It also sets a default DTRACK_API_URL pointing to the local API port, + so the upload step can use it if the user has not defined one. + """ + self._ask_ports(interactive=interactive) + + # If the Trivy analyzer is enabled, make sure we have a token ready + if self.enable_trivy_analyzer: + token = self._resolve_trivy_token(interactive=interactive) + if not token: + return False + self.trivy_token = token + else: + self.trivy_token = None + + if not self._ensure_dependency_track_running(docker_dir): + return False + + # If DTRACK_API_URL is not already defined, default to the local instance + os.environ.setdefault("DTRACK_API_URL", f"http://localhost:{self.api_port}") + return True + + def _configure_existing_dependency_track_url( + self, + interactive: bool, + provided_url: Optional[str] = None, + ) -> bool: + """ + Configure connection to an already running Dependency-Track instance. + + This method: + - Uses environment variable DTRACK_API_URL if present. + - Otherwise tries to load a previously stored URL from assistant .env. + - Falls back to 'http://localhost:8081' as a generic default. + - Asks the user to confirm or override the URL. + - Normalizes it so it represents the base URL (without /api/v1/bom). + - Persists the chosen URL in the assistant .env file and in os.environ. + """ + self.u.print_step("Connecting to an existing Dependency-Track instance") + + assistant_root = Path(__file__).resolve().parent + env_file = EnvFile(assistant_root) + + # Load any existing .env variables without overriding real environment + env_file.load_to_environ(overwrite=False) + + # Priority for default URL: + # 1) DTRACK_API_URL environment variable + # 2) dependency_tracker_url in assistant .env + # 3) plain localhost default + default_url = os.environ.get("DTRACK_API_URL") or env_file.read_key("dependency_tracker_url") + normalized_url: Optional[str] = None + + if provided_url: + normalized_url = self._normalize_api_base_url(provided_url.strip()) + elif interactive: + fallback = default_url or "http://localhost:8081" + print( + "\nYou chose to connect to an already running Dependency-Track instance.\n" + "Please provide the base URL of its API server, for example:\n" + " http://dtrack.example.com:8080\n" + " http://localhost:8081\n" + ) + user_url = input(f"Dependency-Track API base URL [{fallback}]: ").strip() or fallback + normalized_url = self._normalize_api_base_url(user_url) + else: + if not default_url: + print( + "❌ DTRACK_API_URL (or dependency_tracker_url in the assistant .env) " + "is required in forced mode when using a remote server." + ) + return False + normalized_url = self._normalize_api_base_url(default_url) + + normalized_url = self._maybe_fix_ui_port(normalized_url) + + # Store in environment and .env for future runs. + os.environ["DTRACK_API_URL"] = normalized_url + if interactive or provided_url: + env_file.write_key("dependency_tracker_url", normalized_url) + + print(f"Using Dependency-Track API base URL: {normalized_url}") + return True + + @staticmethod + def _normalize_api_base_url(url: str) -> str: + """ + Normalize a Dependency-Track URL so it is a base API URL (no /api/v1/bom). + + Examples: + - 'http://host:8081/api/v1/bom' -> 'http://host:8081' + - 'http://host:8081/api/v1' -> 'http://host:8081' + - 'http://host:8081/api' -> 'http://host:8081' + - 'http://host:8081/' -> 'http://host:8081' + """ + # Strip trailing slash to simplify processing + stripped = url.rstrip("/") + + # Remove any known API suffixes if present + for suffix in ("/api/v1/bom", "/api/v1", "/api"): + if stripped.endswith(suffix): + stripped = stripped[: -len(suffix)] + break + + return stripped + + def _maybe_fix_ui_port(self, url: str) -> str: + """ + If the provided URL looks like the Dependency-Track UI port (8080) without an API + path, automatically switch to the default API port (8081) and warn the user. + """ + parsed = urlparse(url) + if not parsed.scheme or not parsed.netloc: + return url + + if parsed.port == 8080 and (parsed.path in ("", "/")): + fixed = parsed._replace(netloc=f"{parsed.hostname}:8081") + new_url = urlunparse(fixed) + print( + "Provided URL appears to point to the UI port (8080). " + "Using the default API port instead: " + f"{new_url}" + ) + return new_url + + return url + + # ------------------------------------------------------------------ + # Ports (local mode only) + # ------------------------------------------------------------------ + + def _ask_ports(self, interactive: bool) -> None: + """Ask the user which host ports to use for the API and UI containers.""" + self.u.print_step("Dependency-Track ports") + + print( + "Default ports are:\n" + f" - API: {self.api_port} (host) → 8080 (container)\n" + f" - UI : {self.ui_port} (host) → 8080 (container)\n" + "\nIf those ports are already in use, you can choose different ones.\n" + ) + + if interactive: + api = input(f"API host port [{self.api_port}]: ").strip() + ui = input(f"UI host port [{self.ui_port}]: ").strip() + + if api: + self.api_port = api + if ui: + self.ui_port = ui + else: + print( + "Using provided ports without prompting " + f"(API={self.api_port}, UI={self.ui_port}) due to forced mode." + ) + + print(f"Using API port: {self.api_port}") + print(f"Using UI port: {self.ui_port}") + + # ------------------------------------------------------------------ + # Trivy token resolution (local + Trivy mode) + # ------------------------------------------------------------------ + + def _resolve_trivy_token(self, interactive: bool) -> Optional[str]: + """ + Resolve the token used by the Trivy server. + + Resolution order: + 1) Environment variable TRIVY_SERVER_TOKEN (if set). + 2) .env file next to this assistant (key: trivy_server_token). + 3) Interactive prompt: + - If user enters a value, use it. + - If left empty, fall back to default 'SECRETTOKEN123'. + + In cases (2) and (3), the token is stored in the assistant's .env file + so it can be reused on future runs. + """ + assistant_root = Path(__file__).resolve().parent + env_file = EnvFile(assistant_root) + + # 1) Try environment variable + env_token = os.environ.get("TRIVY_SERVER_TOKEN") + if env_token: + print("Using Trivy server token from environment variable TRIVY_SERVER_TOKEN.") + print("Remember to configure the same token in the Trivy analyzer in Dependency-Track.") + return env_token + + # 2) Try token from assistant .env + file_token = env_file.read_key("trivy_server_token") + if file_token: + print("Using Trivy server token from assistant .env (trivy_server_token).") + print("Remember to configure the same token in the Trivy analyzer in Dependency-Track.") + return file_token + + # 3) Ask the user; allow an empty value to mean 'use the default' + if not interactive: + print( + "❌ TRIVY_SERVER_TOKEN is required in forced mode. " + "Set it via environment variable or trivy_server_token in the assistant .env." + ) + return None + + default_token = "SECRETTOKEN123" + user_input = input( + f"Enter Trivy server token (leave empty to use default '{default_token}'): " + ).strip() + + token = user_input or default_token + + # Persist in assistant .env for future runs + env_file.write_key("trivy_server_token", token) + + if user_input: + print( + "Using Trivy server token provided by user and storing it in assistant .env " + "(key: trivy_server_token)." + ) + else: + print( + f"Using default Trivy server token '{default_token}' and storing it in assistant .env " + "(key: trivy_server_token)." + ) + + print("You will need to configure the same token in the Trivy analyzer settings in Dependency-Track.") + return token + + # ------------------------------------------------------------------ + # docker-compose helper (.env for ports + Trivy) + # ------------------------------------------------------------------ + + def _write_compose_env(self, docker_dir: Path) -> None: + """ + Write a .env file in the docker-compose directory with the selected ports + and, if enabled, the Trivy configuration (port and token). + + docker-compose.*.yml will use these variables. + """ + env_path = docker_dir / ".env" + lines: list[str] = [ + f"DTRACK_API_PORT={self.api_port}", + f"DTRACK_UI_PORT={self.ui_port}", + ] + # Trivy-related environment variables are only written when the analyzer is enabled + if self.enable_trivy_analyzer: + lines.append("TRIVY_PORT=4954") + if self.trivy_token: + lines.append(f"TRIVY_TOKEN={self.trivy_token}") + + env_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + # ------------------------------------------------------------------ + # Ensure Dependency-Track is running (local mode only) + # ------------------------------------------------------------------ + + def _ensure_dependency_track_running(self, docker_dir: Path) -> bool: + """ + Ensure Dependency-Track is running via docker compose in docker_dir. + + Expects: + - docker-compose.base.yml (always) + - docker-compose.trivy.yml (optional, used when enable_trivy_analyzer=True) + """ + if not self.u.find_executable("docker"): + print("❌ 'docker' is not in PATH. Cannot start Dependency-Track.") + return False + + docker_dir.mkdir(parents=True, exist_ok=True) + + base_compose = docker_dir / "docker-compose.base.yml" + trivy_compose = docker_dir / "docker-compose.trivy.yml" + + if not base_compose.is_file(): + print(f"❌ Missing docker-compose.base.yml in {docker_dir}") + print(" Expected dependency-track docker-compose configuration to be present on disk.") + return False + + # Write .env with selected ports (and Trivy settings) so docker-compose can use them + self._write_compose_env(docker_dir) + + cmd = ["docker", "compose", "-f", str(base_compose)] + + if self.enable_trivy_analyzer and trivy_compose.is_file(): + cmd += ["-f", str(trivy_compose)] + + cmd += ["up", "-d"] + + self.u.print_step("Starting Dependency-Track with 'docker compose up -d'") + rc = self.u.run_command(cmd, cwd=str(docker_dir)) + if rc == 0: + ui_url = f"http://localhost:{self.ui_port}" + + trivy_note = "" + if self.enable_trivy_analyzer and trivy_compose.is_file(): + # Build the API token line depending on whether we actually have a token + if self.trivy_token: + api_token_line = ( + f" API token: {self.trivy_token} " + "(must match the token configured for the Trivy server)\n" + ) + else: + api_token_line = ( + " API token: (none; no token configured for the Trivy server)\n" + ) + + trivy_note = ( + "\nIf this assistant started a Trivy server (trivy-server service), you can configure it in " + "Dependency-Track under:\n" + " Administration → Analyzers → Trivy\n" + "Use the following settings:\n" + " Base URL : http://trivy-server:4954\n" + f"{api_token_line}" + ) + + print( + f"""\nℹ️ Dependency-Track UI: {ui_url} +Initial login for a brand new instance: + username: admin + password: admin + +You will be forced to change the admin password on first login. +Please store the new password in a password manager. + +Recommended next step for automation: + 1) Go to: Administration → Access Management → Teams + 2) Create or select a team (for example "Automation") + 3) Generate an API key for that team + 4) Enter it once in this assistant when asked for "Dependency-Track API Key" +This assistant will store that token in a local .env file next to these scripts and reuse it for future BOM uploads. +{trivy_note}""" + ) + else: + print("⚠️ Failed to start Dependency-Track. Check Docker and docker compose.") + return False + return True + + # ------------------------------------------------------------------ + # BOM upload to Dependency-Track (shared by both modes) + # ------------------------------------------------------------------ + + def _upload_bom_to_dependency_track( + self, + bom_path: Path, + project_name: str, + project_version: str, + api_url_override: Optional[str], + api_key_override: Optional[str], + interactive: bool, + ) -> bool: + """ + Upload the generated BOM to Dependency-Track using the requests library. + + The API key is resolved in this order: + 1) Environment variable DTRACK_API_KEY (if set). + 2) Local .env file next to this assistant (key: dependency_tracker_token). + 3) Interactive prompt; if provided, it is persisted in that .env file. + + The API URL is resolved as follows: + - Environment variable DTRACK_API_URL (if set; can be a base URL). + - If not set, defaults to http://localhost:. + - If the URL does not already end with '/api/v1/bom', this method + will append '/api/v1/bom' to construct the final endpoint. + """ + # Assistant root directory: where this script (and the rest of the assistant) lives. + # We store a reusable .env file here so the API key is global for the assistant, + # not per scanned project. + assistant_root = Path(__file__).resolve().parent + env_file = EnvFile(assistant_root) + + # Load variables from the assistant's .env into the current environment, + # but do not override existing environment variables. + env_file.load_to_environ(overwrite=False) + + self.u.print_step("Preparing BOM upload to Dependency-Track") + + # Resolve API base URL (can come from parameters, environment or be the local default) + api_url = api_url_override or os.environ.get("DTRACK_API_URL") + if not api_url: + if interactive: + api_url = f"http://localhost:{self.api_port}" + else: + print( + "❌ DTRACK_API_URL is required in forced mode. " + "Set it in the environment or dependency_tracker_url in the assistant .env." + ) + return False + api_url = self._maybe_fix_ui_port(api_url) + print(f"Using Dependency-Track API base URL: {api_url}") + if not api_url.endswith("/api/v1/bom"): + api_url = api_url.rstrip("/") + "/api/v1/bom" + print(f"Full BOM endpoint: {api_url}") + + # 1) Try user-provided API key or environment variable + api_key: Optional[str] = api_key_override or os.environ.get("DTRACK_API_KEY") + + # 2) Try token from the assistant's .env under a reusable key + if not api_key: + token_from_file = env_file.read_key("dependency_tracker_token") + if token_from_file: + print("Loading Dependency-Track API token from assistant .env (dependency_tracker_token)") + api_key = token_from_file + + # 3) Ask user as last resort and persist in the assistant's .env for future runs + if not api_key and interactive: + api_key = input("Enter Dependency-Track API Key (leave empty to skip upload): ").strip() + if api_key: + env_file.write_key("dependency_tracker_token", api_key) + + if not api_key: + print( + "❌ Dependency-Track API key is required. " + "In forced mode configure DTRACK_API_KEY or dependency_tracker_token in the assistant .env." + ) + return False + + self.u.print_step("Project details for Dependency-Track") + print(f"Detected project name: {project_name}") + print(f"Detected project version: {project_version}") + if interactive: + project_name = input(f"Project name in Dependency-Track [{project_name}]: ").strip() or project_name + project_version = input(f"Project version [{project_version}]: ").strip() or project_version + + print(f"Using projectName='{project_name}', projectVersion='{project_version}'") + + data = { + "autoCreate": "true", + "projectName": project_name, + "projectVersion": project_version, + } + + headers = {"X-Api-Key": api_key} + + self.u.print_step("Uploading BOM to Dependency-Track via HTTP request") + try: + with bom_path.open("rb") as bom_file: + files = {"bom": (bom_path.name, bom_file, "application/json")} + response = requests.post( + api_url, + data=data, + headers=headers, + files=files, + timeout=120, + ) + except requests.RequestException as exc: + print(f"⚠️ Failed to upload BOM: {exc}") + return False + + if response.ok: + print("✅ BOM uploaded successfully.") + return True + + print( + f"⚠️ Dependency-Track responded with status {response.status_code}: " + f"{response.text}" + ) + return False diff --git a/security-scanner/docker/dependency-track/docker-compose.base.yml b/security-scanner/docker/dependency-track/docker-compose.base.yml new file mode 100644 index 00000000..2cff8e2f --- /dev/null +++ b/security-scanner/docker/dependency-track/docker-compose.base.yml @@ -0,0 +1,27 @@ +version: "3.8" + +services: + dtrack-apiserver: + image: dependencytrack/apiserver + container_name: dtrack-apiserver + ports: + - "${DTRACK_API_PORT:-8081}:8080" # API + environment: + - APP_LOGGING_LEVEL=INFO + volumes: + - dtrack-data:/data + restart: unless-stopped + + dtrack-frontend: + image: dependencytrack/frontend + container_name: dtrack-frontend + ports: + - "${DTRACK_UI_PORT:-8080}:8080" # Web UI + environment: + - API_BASE_URL=http://localhost:${DTRACK_API_PORT:-8081} + depends_on: + - dtrack-apiserver + restart: unless-stopped + +volumes: + dtrack-data: diff --git a/security-scanner/docker/dependency-track/docker-compose.trivy.yml b/security-scanner/docker/dependency-track/docker-compose.trivy.yml new file mode 100644 index 00000000..bb9be1f0 --- /dev/null +++ b/security-scanner/docker/dependency-track/docker-compose.trivy.yml @@ -0,0 +1,18 @@ +version: "3.8" + +services: + trivy-server: + image: aquasec/trivy:latest + container_name: dtrack-trivy-server + ports: + - "${TRIVY_PORT:-4954}:4954" + volumes: + - trivy-cache:/root/.cache/ + # The token is injected via the TRIVY_TOKEN environment variable, + # which is written to the .env file by DependencyTrackTool. + # If TRIVY_TOKEN is not set, it falls back to SECRETTOKEN123. + entrypoint: "trivy server --listen :4954 --token ${TRIVY_TOKEN:-SECRETTOKEN123}" + restart: unless-stopped + +volumes: + trivy-cache: \ No newline at end of file diff --git a/security-scanner/env_file.py b/security-scanner/env_file.py new file mode 100644 index 00000000..d57acecf --- /dev/null +++ b/security-scanner/env_file.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import os +from pathlib import Path +from typing import Optional + + +class EnvFile: + """Small helper to read/write key=value pairs in a .env-style file.""" + + def __init__(self, env_dir: Path, filename: str = ".env") -> None: + self.path = Path(env_dir) / filename + + def load_to_environ(self, overwrite: bool = False) -> None: + """ + Load all variables from this .env file into os.environ. + + If overwrite is False, existing environment variables are not replaced. + """ + if not self.path.is_file(): + return + + for raw_line in self.path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + if not key: + continue + if overwrite or key not in os.environ: + os.environ[key] = value + + def read_key(self, key: str) -> Optional[str]: + """ + Read the value for a given key from this .env file, if present. + Returns None if the key is not found. + """ + if not self.path.is_file(): + return None + + for raw_line in self.path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + raw_key, raw_value = line.split("=", 1) + if raw_key.strip() == key: + value = raw_value.strip() + return value or None + return None + + def write_key(self, key: str, value: str) -> None: + """ + Ensure this .env file contains a line 'key=value'. + Existing lines are preserved; the key line is added or updated. + """ + lines: list[str] = [] + if self.path.is_file(): + lines = self.path.read_text(encoding="utf-8").splitlines() + + found = False + new_lines: list[str] = [] + + for raw_line in lines: + line = raw_line.rstrip("\n") + stripped = line.strip() + + if "=" in stripped: + existing_key = stripped.split("=", 1)[0].strip() + else: + existing_key = "" + + if existing_key == key: + new_lines.append(f"{key}={value}") + found = True + else: + new_lines.append(line) + + if not found: + if new_lines and new_lines[-1].strip() != "": + new_lines.append("") + new_lines.append(f"{key}={value}") + + self.path.write_text("\n".join(new_lines) + "\n", encoding="utf-8") diff --git a/security-scanner/security_assistant.py b/security-scanner/security_assistant.py new file mode 100644 index 00000000..8cbde735 --- /dev/null +++ b/security-scanner/security_assistant.py @@ -0,0 +1,321 @@ +from __future__ import annotations + +import sys +import argparse +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, Tuple + +from utils import ConsoleUtils +from snyk_tool import SnykTool +from bom_tool import BomTool +from dependency_track_tool import DependencyTrackTool +from trivy_tool import TrivyTool + + +@dataclass +class SecurityConfig: + app_root: Optional[Path] + forced: bool + run_snyk: bool + run_bom: bool + run_trivy: bool + run_dtrack: bool + node_version: Optional[str] + dtrack_api_port: str + dtrack_ui_port: str + dtrack_api_url: Optional[str] + project_name: Optional[str] + project_version: Optional[str] + + +def build_config(args: argparse.Namespace) -> SecurityConfig: + app_root = Path(args.app_root).resolve() if args.app_root else None + + return SecurityConfig( + app_root=app_root, + forced=args.forced, + run_snyk=not args.no_snyk, + run_bom=not args.no_bom, + run_trivy=not args.no_trivy, + run_dtrack=not args.no_dtrack, + node_version=args.node_version, + dtrack_api_port=args.dtrack_api_port or "8081", + dtrack_ui_port=args.dtrack_ui_port or "8080", + dtrack_api_url=args.dependency_track_server, + project_name=args.project_name, + project_version=args.project_version, + ) + + +class SecurityAssistant: + """Coordinates Snyk, BOM generation, Trivy and Dependency-Track using modular tools.""" + + def __init__(self, config: SecurityConfig) -> None: + self.config = config + self.utils = ConsoleUtils(interactive=not config.forced) + self.snyk = SnykTool(self.utils) + self.bom_tool = BomTool(self.utils) + self.dependency_track = DependencyTrackTool(self.utils) + self.trivy = TrivyTool(self.utils) + + def run(self) -> int: + self.utils.print_header("Security Assistant - Snyk / BOM / Trivy / Dependency-Track") + + try: + app_root = self._resolve_app_root() + project_name, project_version = self._detect_project_metadata( + app_root, + require_name=self.config.project_name is None, + require_version=self.config.project_version is None, + allow_prompt=not self.config.forced, + ) + except ValueError as exc: + print(f"❌ {exc}") + return 1 + + if self.config.project_name: + project_name = self.config.project_name + if self.config.project_version: + project_version = self.config.project_version + + bom_path: Optional[Path] = None + trivy_was_run = False + + # 1) Snyk + if self._should_run_step( + self.config.run_snyk, + "Do you want to run Snyk (snyk test)?", + ): + snyk_ok = self.snyk.run(app_root, non_interactive=self.config.forced) + if not snyk_ok and self.config.forced: + return 1 + else: + print("ℹ️ Skipping Snyk.") + + # 2) BOM generation + if self._should_run_step( + self.config.run_bom, + "Do you want to generate a CycloneDX BOM (bom.json)?", + ): + bom_path = self.bom_tool.run( + app_root, + node_version=self.config.node_version, + interactive=not self.config.forced, + ) + if bom_path is None and self.config.forced: + print("❌ Failed to generate bom.json in forced mode.") + return 1 + else: + print("ℹ️ Skipping BOM generation.") + + # Fallback: if BOM already exists from previous runs + if bom_path is None: + candidate = Path(app_root) / "bom.json" + if candidate.is_file(): + bom_path = candidate + + # 3) Trivy (CLI scan of the BOM) + if self._should_run_step( + self.config.run_trivy, + "Do you want to scan the BOM with Trivy?", + ): + if bom_path is None: + print("❌ No BOM found (bom.json). Generate it before running Trivy.") + if self.config.forced: + return 1 + else: + trivy_ok = self.trivy.run(bom_path) + if not trivy_ok and self.config.forced: + print("❌ Trivy scan failed in forced mode.") + return 1 + trivy_was_run = trivy_ok + else: + print("ℹ️ Skipping Trivy.") + + # 4) Dependency-Track (UI + analyzers) + if self._should_run_step( + self.config.run_dtrack, + "Do you want to start Dependency-Track and upload the BOM?", + ): + if bom_path is None: + print("❌ No BOM found (bom.json). Generate it before using Dependency-Track.") + if self.config.forced: + return 1 + else: + dtrack_ok = self.dependency_track.run( + bom_path=bom_path, + project_name=project_name, + project_version=project_version, + enable_trivy_analyzer=trivy_was_run, + api_port=self.config.dtrack_api_port, + ui_port=self.config.dtrack_ui_port, + api_url=self.config.dtrack_api_url, + api_key=None, + interactive=not self.config.forced, + ) + if not dtrack_ok and self.config.forced: + return 1 + else: + print("ℹ️ Skipping Dependency-Track.") + + self._print_summary() + return 0 + + # ---- internal helpers ---- + + def _should_run_step(self, enabled: bool, question: str, default: bool = True) -> bool: + if not enabled: + return False + if self.config.forced: + return True + return self.utils.ask_yes_no(question, default=default) + + def _resolve_app_root(self) -> str: + if self.config.app_root: + if self.config.app_root.is_dir(): + resolved = str(self.config.app_root) + print(f"Using application root from CLI: {resolved}") + return resolved + if self.config.forced: + raise ValueError(f"Provided app root is not a directory: {self.config.app_root}") + print(f"❌ Provided app root is not a directory: {self.config.app_root}") + if self.config.forced: + raise ValueError("In forced mode you must provide --path pointing to a valid directory.") + return self._ask_app_root() + + def _ask_app_root(self) -> str: + while True: + raw_path = input("Path to the application root ('.' for current directory): ").strip() or "." + app_root = str(Path(raw_path).resolve()) + if Path(app_root).is_dir(): + print(f"Using application root: {app_root}") + self.config.app_root = Path(app_root) + return app_root + print("❌ That path does not exist or is not a directory. Please try again.") + + def _detect_project_metadata( + self, + app_root: str, + require_name: bool, + require_version: bool, + allow_prompt: bool, + ) -> Tuple[str, str]: + """ + Detect project name and version from package metadata (e.g. package.json), + falling back to folder name and default version if not found. + + If the package metadata is missing and CLI overrides were not provided, + interactively request the values when allowed. In forced mode, missing + values will raise an error so the caller can abort early. + """ + default_name = Path(app_root).name + default_version = "1.0.0" + pkg_name, pkg_version = self.utils.read_package_metadata(app_root) + + project_name = pkg_name or None + project_version = pkg_version or None + + if project_name is None: + if require_name: + if allow_prompt: + prompt_default = default_name + user_name = input( + f"Project name for Dependency-Track [{prompt_default}]: " + ).strip() + project_name = user_name or prompt_default + else: + raise ValueError( + "Project name not found in package metadata. " + "Provide it via --project-name when running in forced mode." + ) + else: + project_name = default_name + + if project_version is None: + if require_version: + if allow_prompt: + prompt_default = default_version + user_version = input( + f"Project version for Dependency-Track [{prompt_default}]: " + ).strip() + project_version = user_version or prompt_default + else: + raise ValueError( + "Project version not found in package metadata. " + "Provide it via --project-version when running in forced mode." + ) + else: + project_version = default_version + + print(f"Detected project name: {project_name}") + print(f"Detected project version (or default): {project_version}") + return project_name, project_version + + def _print_summary(self) -> None: + self.utils.print_header("All steps completed") + dtrack_ui = self.config.dtrack_api_url or f"http://localhost:{self.config.dtrack_ui_port}" + print("Check the results in:") + print(" - Snyk: this terminal output") + print(f" - Dependency-Track UI: {dtrack_ui}") + print(" - Trivy: container output from 'aquasec/trivy:latest'") + print("\n✅ Security assistant finished.\n") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Security Assistant - run Snyk, CycloneDX BOM, Trivy and Dependency-Track on a project" + ) + parser.add_argument( + "-p", + "--path", + dest="app_root", + metavar="APP_ROOT", + help="Path to the application root (if omitted, you'll be prompted interactively unless --forced is set).", + ) + parser.add_argument( + "--forced", + action="store_true", + help="Run in fully non-interactive mode (no prompts).", + ) + parser.add_argument("--no-snyk", action="store_true", help="Skip Snyk step.") + parser.add_argument("--no-bom", action="store_true", help="Skip BOM generation.") + parser.add_argument("--no-trivy", action="store_true", help="Skip Trivy scan.") + parser.add_argument("--no-dtrack", action="store_true", help="Skip Dependency-Track step.") + parser.add_argument("--node-version", help="Node.js version to use via nvm (e.g. 20.19.0).") + parser.add_argument( + "--dtrack-api-port", + help="Host port to expose the Dependency-Track API when running locally (default 8081).", + ) + parser.add_argument( + "--dtrack-ui-port", + help="Host port to expose the Dependency-Track UI when running locally (default 8080).", + ) + parser.add_argument( + "--dependency-track-server", + dest="dependency_track_server", + metavar="URL", + help=( + "Dependency-Track API base URL to use without prompting (for example http://localhost:8081). " + "If provided, the assistant will connect to that server instead of starting a local stack." + ), + ) + parser.add_argument("--project-name", help="Override project name for Dependency-Track.") + parser.add_argument("--project-version", help="Override project version for Dependency-Track.") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + config = build_config(args) + assistant = SecurityAssistant(config) + return assistant.run() + + +if __name__ == "__main__": + try: + exit_code = main() + except KeyboardInterrupt: + print("\nInterrupted by user. Exiting...\n") + exit_code = 1 + sys.exit(exit_code) diff --git a/security-scanner/snyk_tool.py b/security-scanner/snyk_tool.py new file mode 100644 index 00000000..34f8c7a6 --- /dev/null +++ b/security-scanner/snyk_tool.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import os +from pathlib import Path + +from env_file import EnvFile +from utils import ConsoleUtils + + +class SnykTool: + """ + Best-effort Snyk runner: + 1) Try 'snyk test' immediately. + 2) If it fails, ensure the CLI is installed. + 3) Try to authenticate (non-interactive uses SNYK_TOKEN if present, otherwise skips). + 4) Run 'snyk test' again and return the result. + """ + + def __init__(self, utils: ConsoleUtils) -> None: + self.u = utils + + def run(self, app_root: str, non_interactive: bool = False) -> bool: + self.u.print_header("Step 1 - Snyk") + + if non_interactive: + assistant_root = Path(__file__).resolve().parent + EnvFile(assistant_root).load_to_environ(overwrite=False) + + # First attempt: run snyk test straight away + if self._run_snyk_test(app_root): + return True + + # Ensure CLI exists before retrying + if not self.u.find_executable("snyk"): + self.u.print_step("Snyk CLI not found. Trying to install via npm.") + if not self._install_snyk(): + print("❌ Could not install Snyk with 'npm install -g snyk'.") + return False + + # Attempt authentication if possible (skipped in forced mode without token) + auth_ok = True + if non_interactive: + snyk_token = os.environ.get("SNYK_TOKEN") + if snyk_token: + auth_ok = self._auth(token=snyk_token, interactive=False) + else: + print( + "Skipping 'snyk auth' in forced mode because SNYK_TOKEN is not set. " + "If the CLI is not already authenticated, 'snyk test' may still fail." + ) + else: + auth_ok = self._auth() + if not auth_ok: + print("❌ 'snyk auth' failed. Fix authentication and try again.") + return False + + if not auth_ok: + return False + + # Final attempt: run snyk test again + return self._run_snyk_test(app_root) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _auth(self, token: str | None = None, interactive: bool = True) -> bool: + """ + Run `snyk auth`. If a token is provided, use non-interactive auth; otherwise + perform the interactive login flow. + """ + if token: + self.u.print_step("Running 'snyk auth' with SNYK_TOKEN") + rc = self.u.run_command(["snyk", "auth", token]) + if rc != 0: + print("⚠️ 'snyk auth ' failed. Check the output above for details.") + return False + print("✅ 'snyk auth' completed successfully with provided token.") + return True + + self.u.print_step("Running 'snyk auth'") + if interactive: + print( + "This will open a browser or show a URL so you can log in to your Snyk account.\n" + "You usually only need to do this once per machine.\n" + ) + rc = self.u.run_command(["snyk", "auth"]) + if rc != 0: + print("⚠️ 'snyk auth' failed. Check the output above for details.") + return False + print("✅ 'snyk auth' completed successfully.") + return True + + def _install_snyk(self) -> bool: + """Try to install Snyk globally with npm; return True on success.""" + self.u.print_step("Installing Snyk with 'npm install -g snyk'") + rc = self.u.run_command(["npm", "install", "-g", "snyk"]) + if rc != 0: + print("⚠️ 'npm install -g snyk' failed. Check the output above for details.") + return False + print("✅ Snyk installation command finished (npm install -g snyk).") + return True + + def _run_snyk_test(self, app_root: str) -> bool: + """Run `snyk test` from the project root and report success/failure.""" + self.u.print_step("Running 'snyk test' from the project root") + rc = self.u.run_command(["snyk", "test"], cwd=app_root) + + if rc != 0: + print("⚠️ 'snyk test' finished with errors. Check the output above for details.") + return False + else: + print("✅ 'snyk test' completed successfully.") + return True diff --git a/security-scanner/trivy_tool.py b/security-scanner/trivy_tool.py new file mode 100644 index 00000000..f343808b --- /dev/null +++ b/security-scanner/trivy_tool.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + +from utils import ConsoleUtils + + +class TrivyTool: + """Encapsulates Trivy SBOM scanning.""" + + def __init__(self, utils: ConsoleUtils) -> None: + self.u = utils + + def run(self, bom_path: Path) -> bool: + self.u.print_header("Step 3 - Trivy (scan BOM)") + + if not self.u.find_executable("docker"): + print("❌ 'docker' is not in PATH. Cannot run Trivy in a container.") + return False + + if not bom_path.is_file(): + print(f"❌ BOM file not found: {bom_path}. Generate bom.json before running Trivy.") + return False + + home = Path.home() + base_dir = home / "docker" / "trivy" + input_dir = base_dir / "input" + cache_dir = base_dir / "cache-scan" + + self.u.print_step(f"Preparing directories for Trivy under {base_dir}") + input_dir.mkdir(parents=True, exist_ok=True) + cache_dir.mkdir(parents=True, exist_ok=True) + + target_bom = input_dir / "bom.json" + self.u.print_step(f"Copying {bom_path} to {target_bom}") + shutil.copy2(bom_path, target_bom) + + self.u.print_step("Running Trivy on bom.json") + cmd = [ + "docker", + "run", + "--rm", + "--name", + "trivy", + "-v", + f"{cache_dir}:/root/.cache/", + "-v", + f"{input_dir}:/project", + "-w", + "/project", + "aquasec/trivy:latest", + "sbom", + "./bom.json", + ] + rc = self.u.run_command(cmd) + return rc == 0 + diff --git a/security-scanner/utils.py b/security-scanner/utils.py new file mode 100644 index 00000000..08297f6a --- /dev/null +++ b/security-scanner/utils.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import json +import os +import shutil +import subprocess +from pathlib import Path +from typing import Optional, Tuple + + +class ConsoleUtils: + """Common utilities for console interaction and helper functions.""" + + def __init__(self, interactive: bool = True) -> None: + self.interactive = interactive + + def print_header(self, title: str) -> None: + border = "=" * 80 + print(f"\n{border}\n{title}\n{border}\n") + + def print_step(self, message: str) -> None: + print(f"\n▶ {message}\n") + + def ask_yes_no(self, question: str, default: bool = True) -> bool: + """ + Ask for a yes/no answer on the terminal. + default=True -> [Y/n] + default=False -> [y/N] + """ + if not self.interactive: + return default + + while True: + suffix = " [Y/n]: " if default else " [y/N]: " + answer = input(question + suffix).strip().lower() + + if answer == "" and default is not None: + return default + if answer in ("y", "yes"): + return True + if answer in ("n", "no"): + return False + print("Please answer 'y' or 'n'.") + + def run_command(self, command: list[str], cwd: Optional[str] = None, confirm: bool = True) -> int: + """ + Run a command showing clearly what will be executed. + Pause before running so the user can see it. + """ + print("-" * 60) + print("About to run:") + print(" " + " ".join(command)) + if cwd: + print(f"Working directory: {cwd}") + print("-" * 60) + if self.interactive and confirm: + input("Press Enter to continue... (Ctrl+C to abort) ") + + try: + result = subprocess.run(command, cwd=cwd) + except FileNotFoundError: + print("❌ Command not found. Is it installed and in your PATH?") + return 127 + + if result.returncode == 0: + print("✅ Command finished successfully.") + else: + print(f"⚠️ Command exited with code {result.returncode}. Check the output above.") + return result.returncode + + @staticmethod + def find_executable(name: str) -> Optional[str]: + """Return the path to an executable or None if not found.""" + return shutil.which(name) + + @staticmethod + def load_env_file(path: str) -> None: + """ + Load a simple .env file (KEY=VALUE per line) into the current environment. + Lines starting with '#' or without '=' are ignored. + """ + env_path = Path(path) + if not env_path.is_file(): + print(f"⚠️ .env file not found: {env_path}") + return + + print(f"\n▶ Loading environment variables from {env_path}\n") + with env_path.open("r", encoding="utf-8") as f: + for raw_line in f: + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + os.environ[key] = value + print("✅ .env variables loaded into the current process.") + + @staticmethod + def read_package_metadata(app_root: str) -> Tuple[Optional[str], Optional[str]]: + """ + Try to read name and version from package.json. + Returns (name, version) or (None, None). + """ + pkg_path = Path(app_root) / "package.json" + if not pkg_path.is_file(): + return None, None + + try: + with pkg_path.open("r", encoding="utf-8") as f: + data = json.load(f) + return data.get("name"), data.get("version") + except Exception: + return None, None From 56c915cd7e43f3c580aafda0300d3298f1ed6407 Mon Sep 17 00:00:00 2001 From: idelcano Date: Fri, 21 Nov 2025 18:30:36 +0100 Subject: [PATCH 2/6] Added show dependencytrack warnings result on console too --- security-scanner/dependency_track_tool.py | 143 ++++++++++++++++++++-- 1 file changed, 132 insertions(+), 11 deletions(-) diff --git a/security-scanner/dependency_track_tool.py b/security-scanner/dependency_track_tool.py index cb8b35f3..1cd42d3c 100644 --- a/security-scanner/dependency_track_tool.py +++ b/security-scanner/dependency_track_tool.py @@ -2,8 +2,8 @@ import os from pathlib import Path -from typing import Optional -from urllib.parse import urlparse, urlunparse +from typing import Optional, Dict, Any +from urllib.parse import urlparse, urlunparse, quote import requests @@ -499,21 +499,20 @@ def _upload_bom_to_dependency_track( self.u.print_step("Preparing BOM upload to Dependency-Track") # Resolve API base URL (can come from parameters, environment or be the local default) - api_url = api_url_override or os.environ.get("DTRACK_API_URL") - if not api_url: + api_base_url = api_url_override or os.environ.get("DTRACK_API_URL") + if not api_base_url: if interactive: - api_url = f"http://localhost:{self.api_port}" + api_base_url = f"http://localhost:{self.api_port}" else: print( "❌ DTRACK_API_URL is required in forced mode. " "Set it in the environment or dependency_tracker_url in the assistant .env." ) return False - api_url = self._maybe_fix_ui_port(api_url) - print(f"Using Dependency-Track API base URL: {api_url}") - if not api_url.endswith("/api/v1/bom"): - api_url = api_url.rstrip("/") + "/api/v1/bom" - print(f"Full BOM endpoint: {api_url}") + api_base_url = self._maybe_fix_ui_port(api_base_url) + print(f"Using Dependency-Track API base URL: {api_base_url}") + bom_endpoint = api_base_url.rstrip("/") + "/api/v1/bom" + print(f"Full BOM endpoint: {bom_endpoint}") # 1) Try user-provided API key or environment variable api_key: Optional[str] = api_key_override or os.environ.get("DTRACK_API_KEY") @@ -560,7 +559,7 @@ def _upload_bom_to_dependency_track( with bom_path.open("rb") as bom_file: files = {"bom": (bom_path.name, bom_file, "application/json")} response = requests.post( - api_url, + bom_endpoint, data=data, headers=headers, files=files, @@ -572,6 +571,12 @@ def _upload_bom_to_dependency_track( if response.ok: print("✅ BOM uploaded successfully.") + self._print_vulnerability_summary( + api_base_url=api_base_url, + api_key=api_key, + project_name=project_name, + project_version=project_version, + ) return True print( @@ -579,3 +584,119 @@ def _upload_bom_to_dependency_track( f"{response.text}" ) return False + + # ------------------------------------------------------------------ + # Vulnerability summary + # ------------------------------------------------------------------ + + def _print_vulnerability_summary( + self, + api_base_url: str, + api_key: str, + project_name: str, + project_version: str, + ) -> None: + """ + Fetch vulnerability findings for the given project and print a compact summary. + """ + project = self._lookup_project(api_base_url, api_key, project_name, project_version) + if not project: + print("⚠️ Could not find project in Dependency-Track to fetch vulnerabilities.") + return + + findings = self._fetch_project_findings(api_base_url, api_key, project.get("uuid")) + if findings is None: + print("⚠️ Could not retrieve vulnerability findings for this project.") + return + + if not findings: + print("✅ No vulnerability findings reported for this project.") + return + + severity_counts: Dict[str, int] = {} + entries: list[str] = [] + + for finding in findings: + vulnerability: Dict[str, Any] = finding.get("vulnerability") or {} + severity = (vulnerability.get("severity") or "UNKNOWN").upper() + severity_counts[severity] = severity_counts.get(severity, 0) + 1 + + vuln_id = vulnerability.get("vulnId") or vulnerability.get("title") or vulnerability.get("uuid") or "Unknown" + component = finding.get("componentName") or (finding.get("component") or {}).get("name") or "Unknown component" + comp_version = finding.get("componentVersionName") or (finding.get("component") or {}).get("version") or "" + comp_display = f"{component} {comp_version}".strip() + entries.append(f"[{severity}] {vuln_id} in {comp_display}") + + self.u.print_step("Vulnerability findings from Dependency-Track") + print("Counts by severity:") + for sev in sorted(severity_counts.keys(), reverse=True): + print(f" - {sev}: {severity_counts[sev]}") + + print("\nFindings:") + for line in entries: + print(f" - {line}") + + def _lookup_project( + self, + api_base_url: str, + api_key: str, + project_name: str, + project_version: str, + ) -> Optional[Dict[str, Any]]: + """ + Resolve project metadata via the Dependency-Track lookup endpoint. + """ + url = ( + api_base_url.rstrip("/") + + f"/api/v1/project/lookup?name={quote(project_name)}&version={quote(project_version)}" + ) + try: + response = requests.get(url, headers={"X-Api-Key": api_key}, timeout=60) + except requests.RequestException as exc: + print(f"⚠️ Failed to look up project: {exc}") + return None + + if response.status_code == 404: + print("⚠️ Project not found in Dependency-Track (lookup returned 404).") + return None + if not response.ok: + print(f"⚠️ Project lookup failed with status {response.status_code}: {response.text}") + return None + + try: + return response.json() + except ValueError: + print("⚠️ Project lookup returned invalid JSON.") + return None + + def _fetch_project_findings( + self, + api_base_url: str, + api_key: str, + project_uuid: Optional[str], + ) -> Optional[list[Dict[str, Any]]]: + """ + Fetch vulnerability findings for the given project UUID. + """ + if not project_uuid: + return None + + url = api_base_url.rstrip("/") + f"/api/v1/finding/project/{project_uuid}?suppressed=false" + try: + response = requests.get(url, headers={"X-Api-Key": api_key}, timeout=120) + except requests.RequestException as exc: + print(f"⚠️ Failed to retrieve findings: {exc}") + return None + + if response.status_code == 404: + print("⚠️ Findings endpoint returned 404. The project may not exist or visibility is restricted.") + return None + if not response.ok: + print(f"⚠️ Findings request failed with status {response.status_code}: {response.text}") + return None + + try: + return response.json() + except ValueError: + print("⚠️ Findings response returned invalid JSON.") + return None From 6582a3ab36be105c45f71e03e372738481fb7485 Mon Sep 17 00:00:00 2001 From: idelcano Date: Fri, 21 Nov 2025 18:32:11 +0100 Subject: [PATCH 3/6] Move Snyk to the last step because we don't have an API key, so it's easier to see the result --- security-scanner/security_assistant.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/security-scanner/security_assistant.py b/security-scanner/security_assistant.py index 8cbde735..3c4ad398 100644 --- a/security-scanner/security_assistant.py +++ b/security-scanner/security_assistant.py @@ -82,18 +82,7 @@ def run(self) -> int: bom_path: Optional[Path] = None trivy_was_run = False - # 1) Snyk - if self._should_run_step( - self.config.run_snyk, - "Do you want to run Snyk (snyk test)?", - ): - snyk_ok = self.snyk.run(app_root, non_interactive=self.config.forced) - if not snyk_ok and self.config.forced: - return 1 - else: - print("ℹ️ Skipping Snyk.") - - # 2) BOM generation + # 1) BOM generation if self._should_run_step( self.config.run_bom, "Do you want to generate a CycloneDX BOM (bom.json)?", @@ -115,7 +104,7 @@ def run(self) -> int: if candidate.is_file(): bom_path = candidate - # 3) Trivy (CLI scan of the BOM) + # 2) Trivy (CLI scan of the BOM) if self._should_run_step( self.config.run_trivy, "Do you want to scan the BOM with Trivy?", @@ -159,6 +148,17 @@ def run(self) -> int: else: print("ℹ️ Skipping Dependency-Track.") + # 4) Snyk (run last so its output stays visible) + if self._should_run_step( + self.config.run_snyk, + "Do you want to run Snyk (snyk test)?", + ): + snyk_ok = self.snyk.run(app_root, non_interactive=self.config.forced) + if not snyk_ok and self.config.forced: + return 1 + else: + print("ℹ️ Skipping Snyk.") + self._print_summary() return 0 From 18c8e10d0ee1a91ec85a7e590ee891881df5d429 Mon Sep 17 00:00:00 2001 From: idelcano Date: Fri, 21 Nov 2025 18:53:21 +0100 Subject: [PATCH 4/6] added readme --- security-scanner/README.md | 63 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 security-scanner/README.md diff --git a/security-scanner/README.md b/security-scanner/README.md new file mode 100644 index 00000000..cf80ce14 --- /dev/null +++ b/security-scanner/README.md @@ -0,0 +1,63 @@ +# Security Assistant + +Security Assistant is a CLI that orchestrates multiple security steps for a project: +1) CycloneDX BOM generation +2) Trivy scan of the BOM +3) Dependency-Track upload and finding retrieval +4) Snyk scan (runs last so its output stays visible) + +It can run interactively or in fully non-interactive (`--forced`) mode. + +## Prerequisites +- Python 3 available on your PATH (invoked as `python3`). +- Docker + Docker Compose (for local Dependency-Track runs). +- `npm` available if Snyk needs to be installed automatically. +- Optional tokens: + - `DTRACK_API_KEY` (or stored via prompt) for Dependency-Track uploads/fetches. + - `SNYK_TOKEN` (optional); in forced mode Snyk will skip auth if no token is set. + +## Quick start example +Forced, remote Dependency-Track server, Node 20.19.0: +```bash +python3 security_assistant.py \ + -p /absolute/path/to/app/ \ + --node-version 20.19.0 \ + --dependency-track-server http://url:port/ \ + --forced +``` + +## Usage +```bash +python3 security_assistant.py [options] +``` + +Key options: +- `-p, --path APP_ROOT` Project root. +- `--forced` Fully non-interactive mode. +- `--node-version VERSION` Node version to use via nvm for BOM generation when no nvm default is configured. +- `--dependency-track-server URL` Use an existing Dependency-Track API base URL (skips local stack). +- `--show-dtrack-findings` Fetch Dependency-Track findings even if the upload step is skipped (e.g., existing BOM). +- `--project-name NAME` / `--project-version VERSION` Override detected metadata. +- `--no-bom` / `--no-trivy` / `--no-dtrack` / `--no-snyk` Skip specific steps. +- `--dtrack-api-port` / `--dtrack-ui-port` Ports when starting local Dependency-Track. + +Run `python3 security_assistant.py --help` for the full list. + +## Behavior notes +- Project metadata: name/version are read from `package.json`. If missing and no overrides are provided, you will be prompted (or forced mode will fail unless you supply them). +- Dependency-Track: + - If the BOM upload returns HTTP 409, the assistant treats it as “already exists” and shows findings from the existing BOM. + - `--show-dtrack-findings` lets you view findings without uploading a BOM in this run. + - If you pass a URL pointing to the UI port (8080), it will automatically switch to the API port (8081). +- Snyk: + - Runs last so its output remains visible. + - In forced mode, if `SNYK_TOKEN` is absent it skips auth and attempts the scan (may fail if the CLI is not already authenticated). +- Summary: the final summary prints the Dependency-Track URL you provided (or localhost defaults) along with Snyk/Trivy notes. + +## Typical flow +1) Generate `bom.json` (or reuse an existing one). +2) Scan the BOM with Trivy. +3) Upload BOM to Dependency-Track and fetch findings (or just fetch findings when requested). +4) Run `snyk test`. + +Exit codes are non-zero in forced mode when a required step fails. Duplicate BOMs (HTTP 409) are treated as soft success so findings can still be displayed. From 8806e4c7db4f7e5ef210836f7d00c953c5d23c41 Mon Sep 17 00:00:00 2001 From: idelcano Date: Fri, 21 Nov 2025 18:54:08 +0100 Subject: [PATCH 5/6] detect BOM upload status to be clear to see if it was pushed, already pushed and ignored, or has some error in the push --- security-scanner/dependency_track_tool.py | 186 ++++++++++++++-------- 1 file changed, 121 insertions(+), 65 deletions(-) diff --git a/security-scanner/dependency_track_tool.py b/security-scanner/dependency_track_tool.py index 1cd42d3c..47a216b5 100644 --- a/security-scanner/dependency_track_tool.py +++ b/security-scanner/dependency_track_tool.py @@ -251,6 +251,59 @@ def _maybe_fix_ui_port(self, url: str) -> str: return url + def _resolve_api_context( + self, + api_url_override: Optional[str], + api_key_override: Optional[str], + interactive: bool, + ) -> Optional[tuple[str, str]]: + """ + Resolve API base URL and API key, loading assistant .env as needed. + """ + assistant_root = Path(__file__).resolve().parent + env_file = EnvFile(assistant_root) + + # Load variables from the assistant's .env into the current environment, + # but do not override existing environment variables. + env_file.load_to_environ(overwrite=False) + + api_base_url = api_url_override or os.environ.get("DTRACK_API_URL") + if not api_base_url: + if interactive: + api_base_url = f"http://localhost:{self.api_port}" + else: + print( + "❌ DTRACK_API_URL is required in forced mode. " + "Set it in the environment or dependency_tracker_url in the assistant .env." + ) + return None + api_base_url = self._maybe_fix_ui_port(api_base_url) + + # 1) Try user-provided API key or environment variable + api_key: Optional[str] = api_key_override or os.environ.get("DTRACK_API_KEY") + + # 2) Try token from the assistant's .env under a reusable key + if not api_key: + token_from_file = env_file.read_key("dependency_tracker_token") + if token_from_file: + print("Loading Dependency-Track API token from assistant .env (dependency_tracker_token)") + api_key = token_from_file + + # 3) Ask user as last resort and persist in the assistant's .env for future runs + if not api_key and interactive: + api_key = input("Enter Dependency-Track API Key (stored locally for reuse): ").strip() + if api_key: + env_file.write_key("dependency_tracker_token", api_key) + + if not api_key: + print( + "❌ Dependency-Track API key is required. " + "In forced mode configure DTRACK_API_KEY or dependency_tracker_token in the assistant .env." + ) + return None + + return api_base_url, api_key + # ------------------------------------------------------------------ # Ports (local mode only) # ------------------------------------------------------------------ @@ -475,68 +528,24 @@ def _upload_bom_to_dependency_track( """ Upload the generated BOM to Dependency-Track using the requests library. - The API key is resolved in this order: - 1) Environment variable DTRACK_API_KEY (if set). - 2) Local .env file next to this assistant (key: dependency_tracker_token). - 3) Interactive prompt; if provided, it is persisted in that .env file. - - The API URL is resolved as follows: - - Environment variable DTRACK_API_URL (if set; can be a base URL). - - If not set, defaults to http://localhost:. - - If the URL does not already end with '/api/v1/bom', this method - will append '/api/v1/bom' to construct the final endpoint. + A duplicate BOM (HTTP 409) is treated as a soft success: we still fetch findings + and clearly state that the BOM was already present. Any other failure is fatal. """ - # Assistant root directory: where this script (and the rest of the assistant) lives. - # We store a reusable .env file here so the API key is global for the assistant, - # not per scanned project. - assistant_root = Path(__file__).resolve().parent - env_file = EnvFile(assistant_root) - - # Load variables from the assistant's .env into the current environment, - # but do not override existing environment variables. - env_file.load_to_environ(overwrite=False) + ctx = self._resolve_api_context( + api_url_override=api_url_override, + api_key_override=api_key_override, + interactive=interactive, + ) + if ctx is None: + return False + api_base_url, api_key = ctx self.u.print_step("Preparing BOM upload to Dependency-Track") - # Resolve API base URL (can come from parameters, environment or be the local default) - api_base_url = api_url_override or os.environ.get("DTRACK_API_URL") - if not api_base_url: - if interactive: - api_base_url = f"http://localhost:{self.api_port}" - else: - print( - "❌ DTRACK_API_URL is required in forced mode. " - "Set it in the environment or dependency_tracker_url in the assistant .env." - ) - return False - api_base_url = self._maybe_fix_ui_port(api_base_url) - print(f"Using Dependency-Track API base URL: {api_base_url}") bom_endpoint = api_base_url.rstrip("/") + "/api/v1/bom" + print(f"Using Dependency-Track API base URL: {api_base_url}") print(f"Full BOM endpoint: {bom_endpoint}") - # 1) Try user-provided API key or environment variable - api_key: Optional[str] = api_key_override or os.environ.get("DTRACK_API_KEY") - - # 2) Try token from the assistant's .env under a reusable key - if not api_key: - token_from_file = env_file.read_key("dependency_tracker_token") - if token_from_file: - print("Loading Dependency-Track API token from assistant .env (dependency_tracker_token)") - api_key = token_from_file - - # 3) Ask user as last resort and persist in the assistant's .env for future runs - if not api_key and interactive: - api_key = input("Enter Dependency-Track API Key (leave empty to skip upload): ").strip() - if api_key: - env_file.write_key("dependency_tracker_token", api_key) - - if not api_key: - print( - "❌ Dependency-Track API key is required. " - "In forced mode configure DTRACK_API_KEY or dependency_tracker_token in the assistant .env." - ) - return False - self.u.print_step("Project details for Dependency-Track") print(f"Detected project name: {project_name}") print(f"Detected project version: {project_version}") @@ -569,21 +578,65 @@ def _upload_bom_to_dependency_track( print(f"⚠️ Failed to upload BOM: {exc}") return False + duplicate = response.status_code == 409 + if response.ok: print("✅ BOM uploaded successfully.") - self._print_vulnerability_summary( - api_base_url=api_base_url, - api_key=api_key, - project_name=project_name, - project_version=project_version, + summary_note = "Findings below reflect the freshly uploaded BOM." + elif duplicate: + print( + "ℹ️ BOM already exists in Dependency-Track (HTTP 409). " + "No new upload performed; showing findings from the existing BOM." + ) + summary_note = "Findings below come from the existing BOM (no new upload)." + else: + print( + f"⚠️ Dependency-Track responded with status {response.status_code}: " + f"{response.text}" ) - return True + return False - print( - f"⚠️ Dependency-Track responded with status {response.status_code}: " - f"{response.text}" + self._print_vulnerability_summary( + api_base_url=api_base_url, + api_key=api_key, + project_name=project_name, + project_version=project_version, + source_note=summary_note, + ) + return True + + # ------------------------------------------------------------------ + # Findings-only entry point + # ------------------------------------------------------------------ + + def show_findings( + self, + project_name: str, + project_version: str, + api_url_override: Optional[str], + api_key_override: Optional[str], + interactive: bool, + ) -> bool: + """ + Fetch and print vulnerability findings for an existing project without uploading a BOM. + """ + ctx = self._resolve_api_context( + api_url_override=api_url_override, + api_key_override=api_key_override, + interactive=interactive, ) - return False + if ctx is None: + return False + api_base_url, api_key = ctx + + self._print_vulnerability_summary( + api_base_url=api_base_url, + api_key=api_key, + project_name=project_name, + project_version=project_version, + source_note="Findings below were fetched without uploading a BOM in this run.", + ) + return True # ------------------------------------------------------------------ # Vulnerability summary @@ -595,6 +648,7 @@ def _print_vulnerability_summary( api_key: str, project_name: str, project_version: str, + source_note: Optional[str] = None, ) -> None: """ Fetch vulnerability findings for the given project and print a compact summary. @@ -628,6 +682,8 @@ def _print_vulnerability_summary( entries.append(f"[{severity}] {vuln_id} in {comp_display}") self.u.print_step("Vulnerability findings from Dependency-Track") + if source_note: + print(source_note) print("Counts by severity:") for sev in sorted(severity_counts.keys(), reverse=True): print(f" - {sev}: {severity_counts[sev]}") From b63b4a99c3c728759ababfac8dbf1afcd9b06418 Mon Sep 17 00:00:00 2001 From: idelcano Date: Fri, 21 Nov 2025 18:55:33 +0100 Subject: [PATCH 6/6] add findings only to check the BOM status on dependency track --- security-scanner/security_assistant.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/security-scanner/security_assistant.py b/security-scanner/security_assistant.py index 3c4ad398..f2e40fb6 100644 --- a/security-scanner/security_assistant.py +++ b/security-scanner/security_assistant.py @@ -25,6 +25,7 @@ class SecurityConfig: dtrack_api_port: str dtrack_ui_port: str dtrack_api_url: Optional[str] + dtrack_show_findings: bool project_name: Optional[str] project_version: Optional[str] @@ -43,6 +44,7 @@ def build_config(args: argparse.Namespace) -> SecurityConfig: dtrack_api_port=args.dtrack_api_port or "8081", dtrack_ui_port=args.dtrack_ui_port or "8080", dtrack_api_url=args.dependency_track_server, + dtrack_show_findings=args.show_dtrack_findings, project_name=args.project_name, project_version=args.project_version, ) @@ -81,6 +83,7 @@ def run(self) -> int: bom_path: Optional[Path] = None trivy_was_run = False + dtrack_ok = False # 1) BOM generation if self._should_run_step( @@ -122,7 +125,7 @@ def run(self) -> int: else: print("ℹ️ Skipping Trivy.") - # 4) Dependency-Track (UI + analyzers) + # 3) Dependency-Track (UI + analyzers) if self._should_run_step( self.config.run_dtrack, "Do you want to start Dependency-Track and upload the BOM?", @@ -148,6 +151,18 @@ def run(self) -> int: else: print("ℹ️ Skipping Dependency-Track.") + # 3b) Findings-only (fetch vulnerabilities even if BOM upload was skipped) + if self.config.dtrack_show_findings and not dtrack_ok: + findings_ok = self.dependency_track.show_findings( + project_name=project_name, + project_version=project_version, + api_url_override=self.config.dtrack_api_url, + api_key_override=None, + interactive=not self.config.forced, + ) + if not findings_ok and self.config.forced: + return 1 + # 4) Snyk (run last so its output stays visible) if self._should_run_step( self.config.run_snyk, @@ -300,6 +315,11 @@ def parse_args() -> argparse.Namespace: "If provided, the assistant will connect to that server instead of starting a local stack." ), ) + parser.add_argument( + "--show-dtrack-findings", + action="store_true", + help="Fetch and display Dependency-Track findings even if the BOM upload step is skipped.", + ) parser.add_argument("--project-name", help="Override project name for Dependency-Track.") parser.add_argument("--project-version", help="Override project version for Dependency-Track.") return parser.parse_args()