Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/owasp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ on:
pull_request_target:
types: [opened, synchronize, reopened]


permissions:
contents: read
pull-requests: write
issues: write

jobs:
scan:
Expand Down Expand Up @@ -81,7 +81,6 @@ jobs:
else
echo "vulnerabilities_found=false" >> $GITHUB_OUTPUT
fi
exit 0

- name: Create PR comment body
if: always()
Expand Down Expand Up @@ -125,5 +124,10 @@ jobs:
- name: Fail if vulnerabilities found
if: steps.owasp.outputs.vulnerabilities_found == 'true'
run: |
echo "::error::OWASP scanner reported vulnerabilities."
echo "::error::❌ Vulnerabilities detected! Merge blocked."
exit 1

- name: Safe to merge
if: steps.owasp.outputs.vulnerabilities_found == 'false'
run: |
echo "✅ No vulnerabilities found. Safe to merge."
104 changes: 49 additions & 55 deletions scanner/core.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
# Responsibilities:
# - Reads target file, stores code lines
# - Manages vulnerability list
# - Runs all rule checks (auto-discovers rules in scanner/rules)
# - Provides add_vulnerability callback
# - Prints a grouped, colourised report

import os
import importlib
import pkgutil
Expand All @@ -21,7 +14,6 @@ def _load_rule_modules():
if hasattr(mod, "check"):
modules.append(mod)

# Stable order: by CATEGORY "A01: ..." if provided, else by module name
def key(m):
cat = getattr(m, "CATEGORY", "")
head = cat.split(":", 1)[0].strip() if cat else ""
Expand Down Expand Up @@ -61,7 +53,6 @@ def parse_file(self):

def run_checks(self):
for rule in RULE_MODULES:
# each rule exposes: check(code_lines, add_vulnerability)
rule.check(self.code_lines, self.add_vulnerability)

def run(self):
Expand All @@ -70,48 +61,47 @@ def run(self):
self.run_checks()

def report(self):
# ---- colour helpers ----
"""Outputs results with colors locally, or clean Markdown when in GitHub Actions."""
def supports_truecolor() -> bool:
return os.environ.get("COLORTERM", "").lower() in ("truecolor", "24bit")

disable_color = os.environ.get("GITHUB_ACTIONS") == "true"

def rgb(r, g, b) -> str:
return f"\033[38;2;{r};{g};{b}m"

ANSI = {
"reset": "\033[0m",
"bold": "\033[1m",
"cyan": "\033[96m",
"magenta": "\033[95m",
"yellow": "\033[93m",
"red": "\033[91m",
"green": "\033[92m",
"blue": "\033[94m",
"reset": "" if disable_color else "\033[0m",
"bold": "" if disable_color else "\033[1m",
"cyan": "" if disable_color else "\033[96m",
"magenta": "" if disable_color else "\033[95m",
"yellow": "" if disable_color else "\033[93m",
"red": "" if disable_color else "\033[91m",
"green": "" if disable_color else "\033[92m",
"blue": "" if disable_color else "\033[94m",
}

TRUECOLOR = supports_truecolor()

# Severity colours (true-color -> fallback)
CRIT = (rgb(220, 20, 60) if TRUECOLOR else ANSI["red"] + ANSI["bold"]) # crimson
HIGH = (rgb(255, 0, 0) if TRUECOLOR else ANSI["red"]) # red
MED = (rgb(255, 165, 0) if TRUECOLOR else ANSI["yellow"]) # orange-ish
LOW = (rgb(0, 200, 0) if TRUECOLOR else ANSI["green"]) # green

RESET = ANSI["reset"]
BOLD = ANSI["bold"]
HDR = (rgb(180, 130, 255) if TRUECOLOR else ANSI["magenta"]) # section header
TITLE = (rgb(120, 220, 200) if TRUECOLOR else ANSI["cyan"]) # title
SUM = (rgb(255, 215, 0) if TRUECOLOR else ANSI["yellow"]) # summary label
TRUECOLOR = supports_truecolor() and not disable_color

sev_color = {"CRITICAL": CRIT, "HIGH": HIGH, "MEDIUM": MED, "LOW": LOW}
sev_color = {
"CRITICAL": "**CRITICAL**" if disable_color else (rgb(220, 20, 60) if TRUECOLOR else ANSI["red"] + ANSI["bold"]),
"HIGH": "**HIGH**" if disable_color else (rgb(255, 0, 0) if TRUECOLOR else ANSI["red"]),
"MEDIUM": "**MEDIUM**" if disable_color else (rgb(255, 165, 0) if TRUECOLOR else ANSI["yellow"]),
"LOW": "**LOW**" if disable_color else (rgb(0, 200, 0) if TRUECOLOR else ANSI["green"]),
}

print(f"\n{BOLD}{TITLE}Scan Results for {self.file_path}:{RESET}")
# ---- Print header ----
if disable_color:
print(f"\n### 🔒 OWASP Scanner Results for `{self.file_path}`")
else:
print(f"\n{ANSI['bold']}{ANSI['cyan']}Scan Results for {self.file_path}:{ANSI['reset']}")

if not self.vulnerabilities:
ok = rgb(0, 200, 0) if TRUECOLOR else ANSI["green"]
print(f"{ok}✅ No vulnerabilities found.{RESET}")
msg = "✅ No vulnerabilities found."
print(msg)
return

# Group by category
# ---- Group by category ----
groups = {}
for v in self.vulnerabilities:
groups.setdefault(v["category"], []).append(v)
Expand All @@ -122,25 +112,29 @@ def cat_key(cat: str):

for cat in sorted(groups.keys(), key=cat_key):
items = sorted(groups[cat], key=lambda x: x["line"])
# tally
sev_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
for v in items:
sev_counts[v["severity"]] = sev_counts.get(v["severity"], 0) + 1

total = len(items)
print(f"\n{BOLD}{HDR}=== {cat} ({total} finding{'s' if total != 1 else ''}) ==={RESET}")

chips = []
for k in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
n = sev_counts.get(k, 0)
if n:
chips.append(f"{sev_color[k]}{k.title()}{RESET}: {n}")
if chips:
print(f"{SUM}Summary:{RESET} " + ", ".join(chips))

sev_counts[v["severity"]] += 1

if disable_color:
print(f"\n#### {cat} ({len(items)} findings)")
chips = []
for k in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
if sev_counts[k]:
chips.append(f"{k}: {sev_counts[k]}")
if chips:
print(f"**Summary:** " + ", ".join(chips))
else:
print(f"\n{ANSI['bold']}{ANSI['magenta']}=== {cat} ({len(items)} findings) ==={ANSI['reset']}")

# ---- List individual vulnerabilities ----
for v in items:
sc = sev_color.get(v["severity"], ANSI["blue"])
print(f"\n {BOLD}• Line {v['line']} |{RESET} "
f"Severity {sc}{v['severity']}{RESET} | "
f"Confidence {v['confidence']}")
print(f" → {v['description']}")
sev = sev_color.get(v["severity"], v["severity"])
if disable_color:
print(f"- Line {v['line']} | Severity {sev} | Confidence {v['confidence']}")
print(f" → {v['description']}")
else:
print(f" {ANSI['bold']}• Line {v['line']} |{ANSI['reset']} "
f"Severity {sev}{ANSI['reset']} | "
f"Confidence {v['confidence']}")
print(f" → {v['description']}")
40 changes: 26 additions & 14 deletions scanner/main.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
# Entry point for the OWASP PR Scanner CLI tool.
# This script parses the command-line arguments (i.e., the file path to scan),
# initializes the VulnerabilityScanner with the specified file, runs all rule checks,
# and prints a formatted vulnerability report to the console.
import sys
import os
from scanner.core import VulnerabilityScanner


import argparse
from .core import VulnerabilityScanner
def main(file_paths):
any_vulns = False

for file_path in file_paths:
scanner = VulnerabilityScanner(file_path)
if not scanner.parse_file():
if os.environ.get("GITHUB_ACTIONS") == "true":
print(f"\n### ⚠️ File `{file_path}` not found")
else:
print(f"\n[!] File {file_path} does not exist.")
continue

def main():
parser = argparse.ArgumentParser(description="OWASP PR Vulnerability Scanner")
parser.add_argument("path", help="Path to Python file to scan")
args = parser.parse_args()
scanner.run_checks()
scanner.report()

if scanner.vulnerabilities:
any_vulns = True

if any_vulns:
sys.exit(1)

scanner = VulnerabilityScanner(args.path)
scanner.run()
scanner.report()

if __name__ == "__main__":
main()
if len(sys.argv) < 2:
print("Usage: python scanner/main.py <file1> <file2> ...")
sys.exit(1)

main(sys.argv[1:])