From e63d0acf89e3fe9b2e1990ffbc55670b718ba07c Mon Sep 17 00:00:00 2001 From: mitchtuininga Date: Sun, 21 Sep 2025 15:34:40 +1000 Subject: [PATCH] Add initial Asset Assessment Scanner implementation Introduces the Asset Assessment Scanner tool with core modules for file handling, OCR, pattern-based sensitive data detection, and reporting. Includes configuration files for regex patterns and risk rules, as well as documentation and requirements for setup and usage. --- Asset-Assessment-Scanner-V1/README.md | 136 +++++++++++++ Asset-Assessment-Scanner-V1/file_handler.py | 30 +++ Asset-Assessment-Scanner-V1/ocr_engine.py | 92 +++++++++ Asset-Assessment-Scanner-V1/patterns.json | 102 ++++++++++ Asset-Assessment-Scanner-V1/reporter.py | 114 +++++++++++ Asset-Assessment-Scanner-V1/requirements.txt | 17 ++ Asset-Assessment-Scanner-V1/risk_rules.json | 180 +++++++++++++++++ Asset-Assessment-Scanner-V1/scan_media.py | 26 +++ Asset-Assessment-Scanner-V1/scanner.py | 194 +++++++++++++++++++ 9 files changed, 891 insertions(+) create mode 100644 Asset-Assessment-Scanner-V1/README.md create mode 100644 Asset-Assessment-Scanner-V1/file_handler.py create mode 100644 Asset-Assessment-Scanner-V1/ocr_engine.py create mode 100644 Asset-Assessment-Scanner-V1/patterns.json create mode 100644 Asset-Assessment-Scanner-V1/reporter.py create mode 100644 Asset-Assessment-Scanner-V1/requirements.txt create mode 100644 Asset-Assessment-Scanner-V1/risk_rules.json create mode 100644 Asset-Assessment-Scanner-V1/scan_media.py create mode 100644 Asset-Assessment-Scanner-V1/scanner.py diff --git a/Asset-Assessment-Scanner-V1/README.md b/Asset-Assessment-Scanner-V1/README.md new file mode 100644 index 0000000..54d37a6 --- /dev/null +++ b/Asset-Assessment-Scanner-V1/README.md @@ -0,0 +1,136 @@ +# Redback Ethics Asset Scanner + +The **Asset Scanner** is a Python-based tool for detecting sensitive information (PII, secrets, credentials, etc.) in documents and media. +It is designed for educational use in cybersecurity and ethics modules. + +--- + +## πŸ“‚ Project Structure + +- `scanner.py` – Main entry point for scanning files and generating reports. +- `scan_media.py` – Scans image/PDF inputs using OCR (`ocr_engine.py`). +- `file_handler.py` – Handles input files and preprocessing. +- `ocr_engine.py` – OCR engine wrapper for text extraction from images. +- `reporter.py` – Builds structured scan results and output reports. +- `patterns.json` – Regex patterns for detecting sensitive items. +- `risk_rules.json` – Maps detected patterns to risk levels, compliance references, and remediation tips. + +--- + +## βš™οΈ Setup + +1. Clone the repository: + ```bash + git clone https://github.com//redback-ethics.git + cd redback-ethics/asset-scanner + ``` + +2. Create and activate a virtual environment: + ```bash + python3 -m venv .venv + source .venv/bin/activate + ``` + +3. Install dependencies: + ```bash + pip install -r requirements.txt + ``` + +--- + +## πŸš€ Usage + +To scan a document: +```bash +python scanner.py --file "/path/to/document.docx" +``` + +To scan an image or PDF (OCR enabled): +```bash +python scan_media.py --file "/path/to/image_or_pdf" +``` + +To scan a directory: +```bash +python scanner.py --root "/path/to/folder" +``` +OR +if you run scanner.py standalone you without and --file or --root arguments you will be prompted +to enter a directory in runtime + +Output will include: +- Detected matches with line context +- Risk level (from `risk_rules.json`) +- Mitigation tips and relevant compliance frameworks + +--- + +## ⚑ Command-Line Arguments + +The scanner supports several arguments to control input and behaviour: + +| Argument | Type | Description | Example | +|----------|------|-------------|---------| +| `--file` | Path | Scan a single file (e.g., `.docx`, `.pdf`, `.png`). | `python scanner.py --file "/path/to/document.docx"` | +| `--root` | Path | Recursively scan all files within a directory. | `python scanner.py --root "/path/to/folder"` | +| `--patterns` | Path | Custom path to `patterns.json`. Useful if you want to override defaults. | `python scanner.py --file test.docx --patterns ./configs/patterns.json` | +| `--out` | Path | File to write structured scan results (JSON or text depending on implementation). | `python scanner.py --root ./docs --out results.json` | +| `--no-console` | Flag | Suppress console output. Results will only be written to the output file. | `python scanner.py --root ./docs --no-console --out results.json` | + +### Common Usage Examples + +Scan one file: +```bash +python scanner.py --file "/Users/alice/Documents/report.docx" +``` + +Recursively Scan Directory: +```bash +python scanner.py --root "/Users/alice/Documents/sensitive_documents' +``` + +--- + +## πŸ›‘οΈ Configuration + +- **`patterns.json`**: Defines regex patterns for items like emails, API keys, driver’s licence numbers, etc. + Each entry specifies: + - `pattern`: regex string + - `risk`: risk level + - `description`: human-readable explanation + +- **`risk_rules.json`**: Associates each pattern with: + - `level`: severity (Low/Medium/High) + - `tip`: recommended mitigation + - `compliance`: legal/regulatory references + +You can extend these files to detect new types of data. + +--- + +## πŸ“ Example + +Scanning a document containing: + +``` +Email: alice@example.com +Password: "SuperSecret123" +``` + +Would output: + +``` +[Email] -> Medium Risk +Tip: Mask or obfuscate emails in logs/code unless strictly required. +Compliance: Privacy Act 1988 (Cth) β€” APP 11 + +[Password] -> High Risk +Tip: Remove hard-coded passwords; rotate immediately; use env vars or a vault. +Compliance: GDPR Art. 32 β€” Security of processing +``` + +--- + +## πŸ”’ Notes + +- Regex-based scanning may produce **false positives**; tune `patterns.json` to your needs. diff --git a/Asset-Assessment-Scanner-V1/file_handler.py b/Asset-Assessment-Scanner-V1/file_handler.py new file mode 100644 index 0000000..abd571c --- /dev/null +++ b/Asset-Assessment-Scanner-V1/file_handler.py @@ -0,0 +1,30 @@ +import os +from docx import Document +# Import extract_text_from_file for PDF and image support +from scan_media import extract_text_from_file + +def find_files(directory, exts=None): + exts = exts or [] + matches = [] + for dirpath, _, filenames in os.walk(directory): + for fn in filenames: + if not exts or any(fn.lower().endswith(e) for e in exts): + matches.append(os.path.join(dirpath, fn)) + return matches + +def read_file(path): + lower_path = path.lower() + if lower_path.endswith('.docx'): + try: + doc = Document(path) + return '\n'.join([p.text for p in doc.paragraphs]) + except Exception as e: + return f"[Error reading DOCX: {e}]" + elif lower_path.endswith('.pdf') or lower_path.endswith(('.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp', '.webp')): + try: + return extract_text_from_file(path) + except Exception as e: + return f"[Error extracting text from media: {e}]" + else: + with open(path, encoding="utf-8", errors="ignore") as f: + return f.read() diff --git a/Asset-Assessment-Scanner-V1/ocr_engine.py b/Asset-Assessment-Scanner-V1/ocr_engine.py new file mode 100644 index 0000000..ea314a5 --- /dev/null +++ b/Asset-Assessment-Scanner-V1/ocr_engine.py @@ -0,0 +1,92 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import List, Optional, Tuple +from pathlib import Path +import re + +import numpy as np +from PIL import Image +import pytesseract +import cv2 + +try: + from pdf2image import convert_from_path + PDF2IMAGE_AVAILABLE = True +except Exception: + PDF2IMAGE_AVAILABLE = False + +@dataclass +class OCRConfig: + dpi: int = 300 + deskew: bool = True + binarize: bool = True + oem: int = 3 + psm: int = 3 + lang: str = "eng" + +def _to_cv(img: Image.Image) -> np.ndarray: + return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) + +def _to_pil(arr: np.ndarray) -> Image.Image: + return Image.fromarray(cv2.cvtColor(arr, cv2.COLOR_BGR2RGB)) + +def _normalize_dpi(img: Image.Image, target_dpi: int) -> Image.Image: + dpi = img.info.get("dpi", (target_dpi, target_dpi))[0] + if dpi < target_dpi: + scale = target_dpi / dpi + new_size = (int(img.width * scale), int(img.height * scale)) + img = img.resize(new_size, Image.LANCZOS) + img.info["dpi"] = (target_dpi, target_dpi) + return img + +def _deskew(cv_img: np.ndarray) -> np.ndarray: + gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) + gray = cv2.bitwise_not(gray) + thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)[1] + coords = np.column_stack(np.where(thresh > 0)) + if coords.size == 0: + return cv_img + angle = cv2.minAreaRect(coords)[-1] + if angle < -45: + angle = -(90 + angle) + else: + angle = -angle + (h, w) = cv_img.shape[:2] + M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0) + rotated = cv2.warpAffine(cv_img, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE) + return rotated + +def _binarize(cv_img: np.ndarray) -> np.ndarray: + gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) + thr = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, + cv2.THRESH_BINARY, 35, 11) + return cv2.cvtColor(thr, cv2.COLOR_GRAY2BGR) + +def preprocess_image(img: Image.Image, cfg: OCRConfig) -> Image.Image: + img = _normalize_dpi(img, cfg.dpi) + cv_img = _to_cv(img) + if cfg.deskew: + cv_img = _deskew(cv_img) + if cfg.binarize: + cv_img = _binarize(cv_img) + return _to_pil(cv_img) + +def _tesseract_args(cfg: OCRConfig) -> str: + return f"--oem {cfg.oem} --psm {cfg.psm}" + +def ocr_image(img: Image.Image, cfg: Optional[OCRConfig] = None) -> str: + cfg = cfg or OCRConfig() + img_p = preprocess_image(img, cfg) + text = pytesseract.image_to_string(img_p, lang=cfg.lang, config=_tesseract_args(cfg)) + return text.strip() + +def pdf_to_images(pdf_path: str | Path, dpi: int = 300) -> List[Image.Image]: + if not PDF2IMAGE_AVAILABLE: + raise RuntimeError("pdf2image not available or poppler missing.") + return convert_from_path(str(pdf_path), dpi=dpi) + +def ocr_pdf(pdf_path: str | Path, cfg: Optional[OCRConfig] = None) -> Tuple[str, List[str]]: + cfg = cfg or OCRConfig() + pages = pdf_to_images(pdf_path, dpi=cfg.dpi) + page_texts = [ocr_image(p, cfg) for p in pages] + return "\n".join(page_texts), page_texts diff --git a/Asset-Assessment-Scanner-V1/patterns.json b/Asset-Assessment-Scanner-V1/patterns.json new file mode 100644 index 0000000..3c80dc3 --- /dev/null +++ b/Asset-Assessment-Scanner-V1/patterns.json @@ -0,0 +1,102 @@ +{ + "email": { + "pattern": "[a-zA-Z0-9+._%-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,63}", + "risk": "Medium", + "description": "Email address" + }, + "aws_access_key": { + "pattern": "\\bAKIA[0-9A-Z]{16}\\b", + "risk": "High", + "description": "AWS Access Key" + }, + "aws_secret_access_key": { + "pattern": "(? Dict[str, Dict[str, Any]]: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + +def _primary_law(compliance: Optional[List[str]]) -> str: + """Return a single primary law label (first in list) or a sensible default.""" + if isinstance(compliance, list) and compliance: + return str(compliance[0]) + return "General Best Practice" + +def _as_list(v) -> List[str]: + if v is None: + return [] + return v if isinstance(v, list) else [v] + +def _enrich_findings(findings: Iterable[Dict[str, Any]], + risk_rules: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Merge upstream scanner findings with Stream 4 risk rules. + Expected incoming fields (best effort): pattern, file, line, match or raw, description (optional). + Returns enriched records with: risk, tip, compliance (array), law (primary), raw (unredacted). + """ + enriched: List[Dict[str, Any]] = [] + for f in findings: + pid = f.get("pattern") + rr = risk_rules.get(pid, {}) + level = (rr.get("level") or "Low").title() # "High" / "Low" + tip = rr.get("tip") or "Follow secure handling and removal procedures." + comp_list = _as_list(rr.get("compliance")) + law = _primary_law(comp_list) + + enriched.append({ + "pattern": pid, + "description": f.get("description"), + "file": f.get("file"), + "line": f.get("line"), + "risk": level, + "tip": tip, + "law": law, # primary law for convenience + "compliance": comp_list, # full list of exact law references + "raw": f.get("raw") or f.get("match") or "" # keep unredacted in JSON + }) + return enriched + +# ---------- Public APIs ---------- + +def write_report(findings: Iterable[Dict[str, Any]], + out_path: str = "scan_report.json", + risk_rules_path: str = DEFAULT_RISK_RULES_PATH) -> List[Dict[str, Any]]: + """ + Write enriched JSON report to disk. + Includes fields: pattern, file, line, risk, tip, law, compliance[], raw (unredacted). + Returns the enriched list (handy if the caller also wants to print console). + """ + risk_rules = _load_risk_rules(risk_rules_path) + enriched = _enrich_findings(findings, risk_rules) + with open(out_path, "w", encoding="utf-8") as f: + json.dump(enriched, f, indent=2) + print(f"[+] Report saved to {out_path}") + return enriched + +def generate_json_report(findings: Iterable[Dict[str, Any]], + risk_rules_path: str = DEFAULT_RISK_RULES_PATH) -> str: + """ + Return the enriched JSON as a string (does not write to disk). + """ + risk_rules = _load_risk_rules(risk_rules_path) + enriched = _enrich_findings(findings, risk_rules) + return json.dumps(enriched, indent=2) + +def generate_console_report(findings: Iterable[Dict[str, Any]], + risk_rules_path: str = DEFAULT_RISK_RULES_PATH) -> None: + """ + Console summary grouped by risk bucket with masked snippets. + Columns per item: Risk | File:Line | Pattern | Tip + """ + risk_rules = _load_risk_rules(risk_rules_path) + enriched = _enrich_findings(findings, risk_rules) + + # Overall counts (for header summaries) + totals = Counter(e["risk"] for e in enriched) + + for bucket in ("High", "Low"): # deterministic order + items = [e for e in enriched if e["risk"] == bucket] + if not items: + continue + print(f"\n=== {bucket} Risk ({len(items)} findings) ===") + print(f"Summary: High: {totals.get('High',0)}, Low: {totals.get('Low',0)}") + + # sort by file then line then pattern for stable output + def _key(x: Dict[str, Any]): + return (str(x.get("file") or ""), int(x.get("line") or 0), str(x.get("pattern") or "")) + for e in sorted(items, key=_key): + file_line = f"{e.get('file') or ''}:{e.get('line') or '?'}" + pattern = e.get("pattern") or "unknown" + tip = e.get("tip") or "" + print(f"\nβ€’ Risk {bucket.upper()} | {file_line} | {pattern} | Tip: {tip}") + print(f" β†’ {REDACTION_TOKEN}") # ALWAYS mask in console \ No newline at end of file diff --git a/Asset-Assessment-Scanner-V1/requirements.txt b/Asset-Assessment-Scanner-V1/requirements.txt new file mode 100644 index 0000000..0004b20 --- /dev/null +++ b/Asset-Assessment-Scanner-V1/requirements.txt @@ -0,0 +1,17 @@ +defusedxml==0.7.1 +Faker==37.1.0 +fonttools==4.57.0 +fpdf2==2.8.3 +lxml==5.4.0 +numpy==2.0.2 +opencv-python==4.12.0.88 +packaging==25.0 +pandas==2.2.3 +pillow==11.2.1 +pytesseract==0.3.13 +python-dateutil==2.9.0.post0 +python-docx==1.1.2 +pytz==2025.2 +six==1.17.0 +typing_extensions==4.13.2 +tzdata==2025.2 diff --git a/Asset-Assessment-Scanner-V1/risk_rules.json b/Asset-Assessment-Scanner-V1/risk_rules.json new file mode 100644 index 0000000..cac2fc2 --- /dev/null +++ b/Asset-Assessment-Scanner-V1/risk_rules.json @@ -0,0 +1,180 @@ +{ + "email": { + "level": "Low", + "tip": "Mask or obfuscate emails in logs/code unless strictly required; avoid storing in repos.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11 (Security of personal information)", + "GDPR Art. 5(1)(c) β€” Data minimisation", + "GDPR Art. 32 β€” Security of processing", + "GDPR Recital 30 β€” Online identifiers" + ] + }, + "aws_access_key": { + "level": "High", + "tip": "Rotate immediately; revoke if exposed; move to a secrets manager; purge from history.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11 (Security of personal information)", + "Privacy Act 1988 (Cth) β€” Notifiable Data Breaches (NDB) scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "aws_secret_access_key": { + "level": "High", + "tip": "Revoke/rotate immediately; store only in a managed secrets vault; purge from history.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "gcp_service_account_key": { + "level": "High", + "tip": "Recreate the key; restrict IAM; store in Google Secret Manager; remove from repo history.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "azure_client_secret": { + "level": "High", + "tip": "Rotate the client secret; use Azure Key Vault; remove from source control history.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "ssh_private_key": { + "level": "High", + "tip": "Remove private keys from repos; rotate dependent keys; store via SSH agent or vault.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "jwt_secret": { + "level": "High", + "tip": "Rotate signing secrets/tokens; never commit to code; load from env or a secrets store.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "api_token": { + "level": "High", + "tip": "Regenerate and scope minimally; store in a secrets manager; remove from history.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "password": { + "level": "High", + "tip": "Remove hard-coded passwords; rotate immediately; use env vars or a vault.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "credit_card": { + "level": "High", + "tip": "Never store card numbers in code; use a tokenisation/payment provider; treat as an incident if exposed.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” Notifiable Data Breaches (NDB) scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing", + "GDPR Art. 33/34 β€” Personal data breach notification" + ] + }, + "ssn": { + "level": "High", + "tip": "Remove or mask SSNs; handle only within strictly controlled, compliant systems.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "GDPR Art. 32 β€” Security of processing", + "California Civil Code Β§ 1798.85 β€” SSN confidentiality (if applicable)" + ] + }, + "phone_number": { + "level": "Low", + "tip": "Obfuscate where possible; avoid logging full numbers; limit retention.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "GDPR Art. 5(1)(c) β€” Data minimisation", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "ip_address": { + "level": "Low", + "tip": "Anonymise or truncate IPs where feasible; avoid storing long-term.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "GDPR Art. 4(1) β€” Personal data definition (when identifiable)", + "GDPR Recital 30 β€” Online identifiers", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "database_connection_string": { + "level": "High", + "tip": "Remove credentials from code; use env vars or a secrets manager; rotate compromised passwords.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11", + "Privacy Act 1988 (Cth) β€” NDB scheme, Part IIIC", + "GDPR Art. 32 β€” Security of processing" + ] + }, + "tfn": { + "level": "High", + "tip": "Never store TFNs in source or logs; restrict access; remove from repo history; secure at rest and in transit.", + "compliance": [ + "Privacy (Tax File Number) Rule 2015 (Cth)", + "Privacy Act 1988 (Cth) β€” APP 9 (Government related identifiers)", + "Privacy Act 1988 (Cth) β€” APP 11 (Security of personal information)", + "Privacy Act 1988 (Cth) β€” Notifiable Data Breaches (Part IIIC)" + ] + }, + "medicare_number": { + "level": "High", + "tip": "Treat Medicare numbers as sensitive; minimise collection; avoid code/logs; secure storage and transmission.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 9 (Government related identifiers)", + "Privacy Act 1988 (Cth) β€” APP 11 (Security of personal information)", + "Privacy Act 1988 (Cth) β€” Notifiable Data Breaches (Part IIIC)" + ] + }, + "drivers_licence_number": { + "level": "High", + "tip": "Do not commit to repos; mask in logs; restrict access; rotate/replace if exposed per jurisdictional guidance.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 9 (Government related identifiers)", + "Privacy Act 1988 (Cth) β€” APP 11 (Security of personal information)", + "Privacy Act 1988 (Cth) β€” Notifiable Data Breaches (Part IIIC)" + ] + }, + "address_au_with_pii": { + "level": "High", + "tip": "Treat full addresses combined with other identifiers as sensitive; mask or remove; restrict access and retention.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11 (Security of personal information)", + "GDPR Art. 4(1) β€” Personal data", + "GDPR Art. 5(1)(c) β€” Data minimisation", + "GDPR Art. 32 β€” Security of processing", + "Privacy Act 1988 (Cth) β€” Notifiable Data Breaches (Part IIIC)" + ] + }, + "name_full": { + "level": "Low", + "tip": "Mask or omit full names in code/logs unless strictly required; minimise collection and retention.", + "compliance": [ + "Privacy Act 1988 (Cth) β€” APP 11 (Security of personal information)", + "GDPR Art. 4(1) β€” Personal data", + "GDPR Art. 5(1)(c) β€” Data minimisation", + "GDPR Art. 32 β€” Security of processing" + ] + } +} \ No newline at end of file diff --git a/Asset-Assessment-Scanner-V1/scan_media.py b/Asset-Assessment-Scanner-V1/scan_media.py new file mode 100644 index 0000000..735e226 --- /dev/null +++ b/Asset-Assessment-Scanner-V1/scan_media.py @@ -0,0 +1,26 @@ +from __future__ import annotations +import argparse +from pathlib import Path +import json + +from file_handler import * +from ocr_engine import ocr_image, ocr_pdf, OCRConfig +from PIL import Image + +def extract_text_from_file(file_path: str) -> str: + """ + Given a file path to a PDF or image, returns the extracted text. + Raises ValueError for unsupported file types. + """ + cfg = OCRConfig() + p = Path(file_path) + try: + if p.suffix.lower() == ".pdf": + text, _ = ocr_pdf(p, cfg) + elif p.suffix.lower() in {".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp"}: + text = ocr_image(Image.open(p), cfg) + else: + raise ValueError(f"Unsupported file type: {p.suffix}") + except Exception as e: + raise RuntimeError(f"OCR failed for {p}: {e}") + return text \ No newline at end of file diff --git a/Asset-Assessment-Scanner-V1/scanner.py b/Asset-Assessment-Scanner-V1/scanner.py new file mode 100644 index 0000000..1439ed2 --- /dev/null +++ b/Asset-Assessment-Scanner-V1/scanner.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 +""" +scanner.py β€” unified scanner compatible with: + - patterns.json (dict: {id: {pattern, risk, description}}) + - reporter.py (Belle's Stream 4: write_report & generate_console_report) + +Findings schema produced here: + { "pattern": , "file": , "line": , "match": , "description": } + +Exit code: + - 1 if any High-risk finding (per risk_rules.json via reporter.write_report) + - 0 otherwise +""" + +from __future__ import annotations +import argparse +import json +import re +import sys +from bisect import bisect +from typing import Dict, Any, Iterable, List, Tuple +import os + +# v1/v3 utilities (project-provided) +from file_handler import find_files, read_file + +# Belle's reporter (Stream 4) +from reporter import write_report, generate_console_report + +# ---- Defaults (align with your repo) ---- +DEFAULT_PATTERNS_FILE = "patterns.json" +DEFAULT_TARGET_EXTS = [".py", ".txt", ".md", ".cfg", ".json", ".docx", ".csv", ".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp", ".webp"] +DEFAULT_OUT = "scan_report.json" + +# ---- Patterns ---- + +def load_patterns(path: str) -> Dict[str, Dict[str, Any]]: + """ + Load pattern definitions from patterns.json + Expected shape: + { + "email": { "pattern": "...", "risk": "Low|High|...", "description": "..." }, + ... + } + """ + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError("patterns.json must be a JSON object mapping ids to rules.") + for pid, rule in data.items(): + if "pattern" not in rule: + raise ValueError(f"Pattern '{pid}' is missing the 'pattern' field.") + return data + +def compile_patterns(patterns: Dict[str, Dict[str, Any]]) -> Dict[str, re.Pattern]: + """Compile all regexes once with DOTALL (to match across lines where needed).""" + compiled: Dict[str, re.Pattern] = {} + for pid, rule in patterns.items(): + pat = rule["pattern"] + try: + compiled[pid] = re.compile(pat, re.DOTALL) + except re.error as e: + raise ValueError(f"Invalid regex for pattern '{pid}': {e}") + return compiled + +# ---- Scanning helpers ---- + +def _newline_indices(text: str) -> List[int]: + return [i for i, ch in enumerate(text) if ch == "\n"] + +def _line_number(newlines: List[int], idx: int) -> int: + # 1-based line numbers: count of newlines before idx + 1 + return bisect(newlines, idx) + 1 + +def scan_text(text: str, file_path: str, + compiled: Dict[str, re.Pattern], + meta: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Run all compiled patterns over a text blob, recording file and line per match. + Returns a list of finding dicts for reporter.py. + """ + findings: List[Dict[str, Any]] = [] + if not text: + return findings + + newlines = _newline_indices(text) + for pid, regex in compiled.items(): + desc = meta.get(pid, {}).get("description", pid) + for m in regex.finditer(text): + start = m.start() + line = _line_number(newlines, start) + raw = m.group(0) + findings.append({ + "pattern": pid, + "file": file_path, + "line": line, + "match": raw, + "description": desc + }) + return findings + +def scan_paths(paths: Iterable[str], + compiled: Dict[str, re.Pattern], + meta: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: + all_findings: List[Dict[str, Any]] = [] + for path in paths: + content = read_file(path) + # Ensure we have text (read_file should return str; if bytes, decode) + if isinstance(content, bytes): + try: + content = content.decode("utf-8") + except UnicodeDecodeError: + content = content.decode("latin-1", errors="ignore") + if not isinstance(content, str): + continue + all_findings.extend(scan_text(content, path, compiled, meta)) + return all_findings + +# ---- CLI ---- + +def parse_args(argv: List[str]) -> argparse.Namespace: + ap = argparse.ArgumentParser(description="Unified sensitive-data scanner") + ap.add_argument("--file", help="Single file to scan (overrides --root and --ext)") + ap.add_argument("--root", default=".", help="Root directory to scan (default: current dir)") + ap.add_argument("--patterns", default=DEFAULT_PATTERNS_FILE, help="Path to patterns.json") + ap.add_argument("--out", default=DEFAULT_OUT, help="Path to JSON report output") + ap.add_argument("--ext", nargs="*", default=DEFAULT_TARGET_EXTS, + help="File extensions to include (e.g., .py .txt .md .cfg .json)") + ap.add_argument("--no-console", action="store_true", help="Skip console summary output") + return ap.parse_args(argv) + +# Function to get a valid directory path from the user +def get_valid_path(): + while True: + path = input("Enter the directory path containing the files to scan (press Enter to use the project folder): ").strip() + path = path.strip('"').strip("'") # Remove surrounding quotes if present + if not path: # If no input is provided, use the current directory + print("No path provided. Files will be scanned in the project folder.") + print("-" * 63) + return os.getcwd() + elif os.path.isdir(path): # Validate the provided path + print("-" * 63) + return path + + else: + print("We cannot find that path. Please enter a valid directory or press Enter to use the project folder.") + +# ---- Main ---- + +def main(argv: List[str] | None = None) -> int: + ns = parse_args(argv or sys.argv[1:]) + + patterns = load_patterns(ns.patterns) + compiled = compile_patterns(patterns) + + # Check if a specific file is provided + if ns.file: + # Validate the file path + if not os.path.isfile(ns.file): + print(f"[!] The specified file does not exist: {ns.file}") + return 1 + + # Scan only the specified file + print(f"[i] Scanning the specified file: {ns.file}") + findings = scan_paths([ns.file], compiled, patterns) + else: + # Identify valid directory to scan + directory = get_valid_path() + + # Use project helper to expand files under root with extension filter + file_list = list(find_files(directory, ns.ext)) + findings = scan_paths(file_list, compiled, patterns) + + # JSON report (enriched with risk/tip/laws by reporter.write_report) + enriched = write_report(findings, out_path=ns.out) + + # Console summary (masked) + if not ns.no_console: + generate_console_report(findings) + + # Exit code policy: fail if any High risk present + has_high = any(f.get("risk") == "High" for f in enriched) + if has_high: + print("[!] High-risk data found. Failing scan.") + return 1 + + if enriched: + print("[i] Findings present. Review the report.") + else: + print("[βœ“] No sensitive data detected.") + return 0 + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file