diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7ff4fe22..72a27189 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,4 +26,5 @@ repos: additional_dependencies: - types-simplejson - types-attrs + - types-PyYAML - pydantic~=2.0 diff --git a/docs/user-guide/testing-auth-rules.md b/docs/user-guide/testing-auth-rules.md new file mode 100644 index 00000000..5eebada0 --- /dev/null +++ b/docs/user-guide/testing-auth-rules.md @@ -0,0 +1,252 @@ +# Testing Auth Rules + +The `stac-auth-tests` CLI tool validates your custom filter classes by running them in the same environment as production, ensuring your authorization rules work as expected. + +## Overview + +Run sanity checks on your auth filters within your production environment (Docker containers, kubernetes pods, etc.) to: + +- **Validate filter logic** - Ensure filters generate correct CQL2 expressions and match items as expected +- **Catch regressions** - Verify changes don't break expected behavior before deployment +- **Test with production data** - Run filters against real STAC APIs and databases in your stack + +## Test File Format + +Test files use YAML (recommended) or JSON. Each test case contains: +- **context** - Request and JWT payload passed to your filter +- **tests** - Tuples of `[item, expected_match]` to validate + +### Example Test File + +```yaml +# tests/auth_rules.yaml +# Define reusable items with YAML anchors +items: + public_item: &public_item + id: item-1 + type: Feature + collection: my-collection + properties: + private: false + geometry: null + + private_item: &private_item + id: item-2 + type: Feature + collection: my-collection + properties: + private: true + geometry: null + +# Define reusable contexts +contexts: + anonymous: &anonymous + req: + path: /collections/my-collection/items + method: GET + headers: {} + query_params: {} + path_params: {} + + authenticated: &authenticated + req: + path: /collections/my-collection/items + method: GET + headers: + authorization: Bearer token + query_params: {} + path_params: {} + payload: + sub: user123 + collections: ["my-collection"] + +test_cases: + - name: Anonymous users see only public items + context: *anonymous + tests: + - [*public_item, true] + - [*private_item, false] + + - name: Authenticated users see their collections + context: *authenticated + tests: + - [*public_item, true] + - [*private_item, true] +``` + +See `tests/example_auth_rules.yaml` for a complete example. + +## Running Tests + +### With Docker Compose (Local Development) + +Add test files to your compose volumes: + +```yaml +# docker-compose.yaml +services: + proxy: + volumes: + - ./tests:/app/tests +``` + +Run tests in your stack: + +```bash +# Start services +docker compose up -d + +# Run tests (creates isolated container with access to your stack) +docker compose run --rm proxy stac-auth-tests \ + --filter-class "my_filters:ItemsFilter" \ + --test-file /app/tests/auth_rules.yaml +``` + +This approach: +- Tests against your actual upstream STAC API +- Runs filters that make API calls (e.g., fetching public collections) +- Uses the same environment variables and network as production + +### In Production Containers + +```dockerfile +FROM ghcr.io/developmentseed/stac-auth-proxy:latest +COPY ./my_filters.py /app/my_filters.py +COPY ./tests /app/tests +``` + +```bash +# Build and test +docker build -t my-stac-proxy . +docker run --rm \ + -e UPSTREAM_URL=http://stac-api:8080 \ + my-stac-proxy \ + stac-auth-tests \ + --filter-class "my_filters:ItemsFilter" \ + --test-file /app/tests/auth_rules.yaml +``` + +### Locally (Development) + +```bash +pip install -e . + +stac-auth-tests \ + --filter-class "stac_auth_proxy.filters:Template" \ + --filter-args '["(properties.private = false)"]' \ + --test-file tests/auth_rules.yaml +``` + +## CLI Options + +```bash +stac-auth-tests \ + --filter-class "module.path:ClassName" # Required: filter class to test + --filter-args '[...]' # Optional: JSON array of positional args + --filter-kwargs '{...}' # Optional: JSON object of keyword args + --test-file path/to/tests.yaml # Required: test file path +``` + +## Example: Testing Custom Filter + +```python +# my_filters.py +import dataclasses +from typing import Any + +@dataclasses.dataclass +class ItemsFilter: + collections_claim: str = "collections" + + async def __call__(self, context: dict[str, Any]) -> str: + jwt = context.get("payload") + if jwt: + collections = jwt.get(self.collections_claim, []) + return f"collection IN ({','.join(repr(c) for c in collections)})" + return "(private IS NULL OR private = false)" +``` + +```yaml +# tests/my_tests.yaml +items: + allowed: &allowed + id: item-1 + collection: allowed-col + type: Feature + properties: {} + geometry: null + + forbidden: &forbidden + id: item-2 + collection: forbidden-col + type: Feature + properties: {} + geometry: null + +test_cases: + - name: User with collection access + context: + req: + path: /search + method: POST + headers: + authorization: Bearer token + query_params: {} + path_params: {} + payload: + sub: user123 + collections: ["allowed-col"] + tests: + - [*allowed, true] + - [*forbidden, false] +``` + +```bash +# Test it +docker compose run --rm proxy stac-auth-tests \ + --filter-class "my_filters:ItemsFilter" \ + --test-file /app/tests/my_tests.yaml +``` + +## CI/CD Integration + +```yaml +# .github/workflows/test.yml +name: Test Auth Rules + +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Build image + run: docker build -t test-image . + - name: Test auth rules + run: | + docker run --rm test-image \ + stac-auth-tests \ + --filter-class "my_filters:ItemsFilter" \ + --test-file /app/tests/auth_rules.yaml +``` + +## Troubleshooting + +**"Failed to generate or validate CQL2 filter"** +- Your filter returned invalid CQL2 syntax +- Check that property references and operators are correct + +**"Item match failures"** +- Filter is valid but items don't match as expected +- Verify property paths (e.g., `properties.private` vs `private`) +- Check data types match (strings vs booleans) + +**"Error loading filter class"** +- Check class path format: `module.path:ClassName` +- Verify module is in Python path and dependencies are installed + +## See Also + +- [Record-Level Authorization Guide](record-level-auth.md) +- [CQL2 Specification](https://docs.ogc.org/DRAFTS/21-065.html) diff --git a/pyproject.toml b/pyproject.toml index 4c3ac1b0..617d1288 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "jinja2>=3.1.4", "pydantic-settings>=2.6.1", "pyjwt>=2.10.1", + "pyyaml>=6.0", "starlette-cramjam>=0.4.0", "uvicorn>=0.32.1", ] @@ -26,6 +27,9 @@ readme = "README.md" requires-python = ">=3.10" version = "0.10.1" +[project.scripts] +stac-auth-tests = "stac_auth_proxy.cli.test_auth_rules:main" + [project.optional-dependencies] docs = [ "griffe-fieldz>=0.3.0", @@ -97,6 +101,7 @@ dev = [ "pytest>=8.3.3", "ruff>=0.0.238", "starlette-cramjam>=0.4.0", + "types-PyYAML", "types-simplejson", "types-attrs", ] diff --git a/src/stac_auth_proxy/cli/__init__.py b/src/stac_auth_proxy/cli/__init__.py new file mode 100644 index 00000000..d0b02e9d --- /dev/null +++ b/src/stac_auth_proxy/cli/__init__.py @@ -0,0 +1 @@ +"""CLI tools for stac-auth-proxy.""" diff --git a/src/stac_auth_proxy/cli/test_auth_rules.py b/src/stac_auth_proxy/cli/test_auth_rules.py new file mode 100644 index 00000000..1691535f --- /dev/null +++ b/src/stac_auth_proxy/cli/test_auth_rules.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python3 +""" +CLI tool for testing STAC auth filter rules. + +This tool allows you to test your custom filter classes (ITEMS_FILTER_CLS or +COLLECTIONS_FILTER_CLS) by running them against test cases and validating the +results match your expectations. +""" + +import argparse +import asyncio +import importlib +import json +import sys +import textwrap +from pathlib import Path +from typing import Any + +import yaml +from cql2 import Expr + + +class Colors: + """ANSI color codes for terminal output.""" + + GREEN = "\033[92m" + RED = "\033[91m" + YELLOW = "\033[93m" + BLUE = "\033[94m" + RESET = "\033[0m" + BOLD = "\033[1m" + + +def print_colored(text: str, color: str) -> None: + """Print colored text to stdout.""" + print(f"{color}{text}{Colors.RESET}") + + +def load_filter_class(cls_path: str, args: list[Any], kwargs: dict[str, Any]) -> Any: + """ + Dynamically load and instantiate a filter class. + + Args: + cls_path: Module path and class name separated by colon (e.g., "module.path:ClassName") + args: Positional arguments for the class constructor + kwargs: Keyword arguments for the class constructor + + Returns: + Instantiated filter class + + Raises: + ValueError: If cls_path format is invalid + ImportError: If module cannot be imported + AttributeError: If class is not found in module + + """ + if ":" not in cls_path: + raise ValueError( + f"Invalid class path format: {cls_path}. " + "Expected format: 'module.path:ClassName'" + ) + + module_path, class_name = cls_path.rsplit(":", 1) + module = importlib.import_module(module_path) + filter_cls = getattr(module, class_name) + return filter_cls(*args, **kwargs) + + +def load_test_file(file_path: Path) -> dict[str, Any]: + """ + Load test cases from a YAML or JSON file. + + Args: + file_path: Path to the test file (YAML or JSON) + + Returns: + Parsed file content + + Raises: + FileNotFoundError: If test file doesn't exist + yaml.YAMLError: If YAML file contains invalid syntax + json.JSONDecodeError: If JSON file contains invalid syntax + + """ + if not file_path.exists(): + raise FileNotFoundError(f"Test file not found: {file_path}") + + with open(file_path) as f: + # Detect format by file extension + if file_path.suffix in (".yaml", ".yml"): + return yaml.safe_load(f) + else: + return json.load(f) + + +async def run_test_case( + test_case: dict[str, Any], + filter_instance: Any, + case_number: int, +) -> tuple[bool, str]: + """ + Run a single test case. + + Args: + test_case: Test case configuration containing: + - name: Test case name (optional) + - context: Context dict to pass to the filter + - tests: List of [item, expected] tuples + + filter_instance: Instantiated filter class + case_number: Test case number for display + + Returns: + Tuple of (success: bool, message: str) + + """ + context = test_case["context"] + test_pairs = test_case["tests"] + + try: + # Generate CQL2 filter from context + cql2_filter_expr = await filter_instance(context) + + # Parse into Expr + if isinstance(cql2_filter_expr, str): + cql2_expr = Expr(cql2_filter_expr) + else: + cql2_expr = Expr(**cql2_filter_expr) + + # Validate the expression + cql2_expr.validate() + + except Exception as e: + return False, f"Failed to generate or validate CQL2 filter: {e}" + + # Test each item + failures = [] + for idx, (item, expected_result) in enumerate(test_pairs): + try: + actual_result = cql2_expr.matches(item) + if actual_result != expected_result: + item_id = item.get("id", f"item #{idx}") + failures.append( + f" Item {item_id}: expected {expected_result}, got {actual_result}" + ) + except Exception as e: + item_id = item.get("id", f"item #{idx}") + failures.append(f" Item {item_id}: error evaluating match - {e}") + + if failures: + failure_msg = "\n".join(failures) + return False, f"Item match failures:\n{failure_msg}" + + return True, f"All {len(test_pairs)} items matched expected results" + + +async def run_tests( + filter_instance: Any, + test_data: dict[str, Any], +) -> tuple[int, int]: + """ + Run all test cases. + + Args: + filter_instance: Instantiated filter class + test_data: Test data containing list of test cases + + Returns: + Tuple of (passed_count, failed_count) + + """ + test_cases = test_data.get("test_cases", []) + + if not test_cases: + print_colored("Warning: No test cases found in test file", Colors.YELLOW) + return 0, 0 + + print_colored( + f"\n{Colors.BOLD}Running {len(test_cases)} test case(s)...\n", Colors.BLUE + ) + + passed = 0 + failed = 0 + + for idx, test_case in enumerate(test_cases, start=1): + name = test_case.get("name", f"Test case #{idx}") + print(f"{Colors.BOLD}[{idx}/{len(test_cases)}]{Colors.RESET} {name}...") + + success, message = await run_test_case(test_case, filter_instance, idx) + + if success: + print_colored(f" ✓ PASS: {message}", Colors.GREEN) + passed += 1 + else: + print_colored(f" ✗ FAIL: {message}", Colors.RED) + failed += 1 + + print() # Empty line between test cases + + return passed, failed + + +def main() -> int: + """ + Run the CLI. + + Returns: + Exit code (0 for success, 1 for failures) + + """ + parser = argparse.ArgumentParser( + description="Test STAC auth filter rules", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent( + """ + Example usage: + # Test a Template filter + stac-auth-tests \\ + --filter-class "stac_auth_proxy.filters:Template" \\ + --filter-args '["(properties.private = false)"]' \\ + --test-file tests/auth_rules.yaml + + # Test an OPA filter with kwargs + stac-auth-tests \\ + --filter-class "stac_auth_proxy.filters:Opa" \\ + --filter-args '["http://opa:8181", "stac/items"]' \\ + --filter-kwargs '{"cache_ttl": 30.0}' \\ + --test-file tests/auth_rules.yaml + + # Test a custom filter + stac-auth-tests \\ + --filter-class "my_module.filters:CustomFilter" \\ + --filter-args '["arg1", "arg2"]' \\ + --test-file tests/my_auth_tests.yaml + """ + ), + ) + + parser.add_argument( + "--filter-class", + required=True, + help='Filter class path in format "module.path:ClassName"', + ) + parser.add_argument( + "--filter-args", + default="[]", + help="JSON array of positional arguments for filter class (default: [])", + ) + parser.add_argument( + "--filter-kwargs", + default="{}", + help="JSON object of keyword arguments for filter class (default: {})", + ) + parser.add_argument( + "--test-file", + type=Path, + required=True, + help="Path to YAML or JSON file containing test cases", + ) + + args = parser.parse_args() + + # Parse JSON args + try: + filter_args = json.loads(args.filter_args) + if not isinstance(filter_args, list): + print_colored("Error: --filter-args must be a JSON array", Colors.RED) + return 1 + except json.JSONDecodeError as e: + print_colored(f"Error parsing --filter-args: {e}", Colors.RED) + return 1 + + try: + filter_kwargs = json.loads(args.filter_kwargs) + if not isinstance(filter_kwargs, dict): + print_colored("Error: --filter-kwargs must be a JSON object", Colors.RED) + return 1 + except json.JSONDecodeError as e: + print_colored(f"Error parsing --filter-kwargs: {e}", Colors.RED) + return 1 + + # Load filter class + try: + print_colored(f"\n{Colors.BOLD}Loading filter class...", Colors.BLUE) + print(f" Class: {args.filter_class}") + print(f" Args: {filter_args}") + print(f" Kwargs: {filter_kwargs}") + filter_instance = load_filter_class( + args.filter_class, filter_args, filter_kwargs + ) + print_colored(" ✓ Filter loaded successfully\n", Colors.GREEN) + except Exception as e: + print_colored(f"Error loading filter class: {e}", Colors.RED) + return 1 + + # Load test file + try: + print_colored(f"{Colors.BOLD}Loading test file...", Colors.BLUE) + print(f" File: {args.test_file}") + test_data = load_test_file(args.test_file) + print_colored(" ✓ Test file loaded successfully", Colors.GREEN) + except Exception as e: + print_colored(f"Error loading test file: {e}", Colors.RED) + return 1 + + # Run tests + passed, failed = asyncio.run(run_tests(filter_instance, test_data)) + + # Print summary + print_colored(f"\n{Colors.BOLD}{'=' * 60}", Colors.BLUE) + print_colored(f"{Colors.BOLD}Test Summary", Colors.BLUE) + print_colored(f"{Colors.BOLD}{'=' * 60}", Colors.BLUE) + print_colored(f"Passed: {passed}", Colors.GREEN) + if failed > 0: + print_colored(f"Failed: {failed}", Colors.RED) + else: + print(f"Failed: {failed}") + total = passed + failed + print(f"Total: {total}") + print_colored(f"{'=' * 60}\n", Colors.BLUE) + + if failed > 0: + print_colored("Some tests failed!", Colors.RED) + return 1 + elif total == 0: + print_colored("No tests were run!", Colors.YELLOW) + return 1 + else: + print_colored("All tests passed!", Colors.GREEN) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/example_auth_rules.yaml b/tests/example_auth_rules.yaml new file mode 100644 index 00000000..05affb6e --- /dev/null +++ b/tests/example_auth_rules.yaml @@ -0,0 +1,205 @@ +# Example auth rules test file demonstrating YAML anchors and aliases +# +# This file shows how to use YAML features to reduce repetition: +# - Anchors (&name) define reusable content +# - Aliases (*name) reference anchored content +# - The tuple format pairs items with their expected results + +# Define reusable STAC items using YAML anchors +items: + # Public items (no private property or private=false) + public_item_1: &public_item_1 + id: public-item-1 + type: Feature + collection: my-collection + properties: + private: false + created: "2024-01-01" + geometry: null + + public_item_no_flag: &public_item_no_flag + id: public-item-2 + type: Feature + collection: my-collection + properties: + created: "2024-01-02" + geometry: null + + # Private items + private_item_1: &private_item_1 + id: private-item-1 + type: Feature + collection: my-collection + properties: + private: true + sensitive: "data" + geometry: null + + private_item_2: &private_item_2 + id: private-item-2 + type: Feature + collection: another-collection + properties: + private: true + geometry: null + + # Items from different collections + allowed_collection_item: &allowed_collection_item + id: allowed-item-1 + type: Feature + collection: my-collection + properties: + private: true + geometry: null + + another_allowed_item: &another_allowed_item + id: allowed-item-2 + type: Feature + collection: another-collection + properties: + private: true + geometry: null + + forbidden_collection_item: &forbidden_collection_item + id: forbidden-item-1 + type: Feature + collection: forbidden-collection + properties: + private: true + secret: "classified" + geometry: null + +# Define reusable context blocks +contexts: + anonymous: &anonymous_context + req: + path: /collections/my-collection/items + method: GET + query_params: {} + path_params: + collection_id: my-collection + headers: {} + # No payload = anonymous user + + authenticated_user: &authenticated_user_context + req: + path: /collections/my-collection/items + method: GET + query_params: {} + path_params: + collection_id: my-collection + headers: + authorization: Bearer fake-token + payload: + sub: user123 + collections: + - my-collection + - another-collection + superuser: "false" + + superuser: &superuser_context + req: + path: /search + method: POST + query_params: {} + path_params: {} + headers: + authorization: Bearer admin-token + payload: + sub: admin + collections: [] + superuser: "true" + +# Test cases using anchors and the tuple format +test_cases: + - name: Anonymous user can only see public items + context: *anonymous_context + tests: + - [*public_item_1, true] # Public item with private=false + - [*public_item_no_flag, true] # Public item without private flag + - [*private_item_1, false] # Private item should be hidden + - [*private_item_2, false] # Another private item should be hidden + + - name: Authenticated user with collection access + context: *authenticated_user_context + tests: + # User has access to "my-collection" and "another-collection" + - [*allowed_collection_item, true] # In allowed collection + - [*another_allowed_item, true] # In another allowed collection + - [*forbidden_collection_item, false] # In forbidden collection + - [*public_item_1, true] # Public items still visible + + - name: Authenticated user with limited collection access + context: + req: + path: /search + method: POST + query_params: {} + path_params: {} + headers: + authorization: Bearer limited-token + payload: + sub: limited-user + collections: + - my-collection # Only has access to my-collection + superuser: "false" + tests: + - [*allowed_collection_item, true] # In allowed collection + - [*another_allowed_item, false] # NOT in allowed collection + - [*forbidden_collection_item, false] # NOT in allowed collection + + - name: Superuser can access everything + context: *superuser_context + tests: + - [*public_item_1, true] + - [*private_item_1, true] + - [*private_item_2, true] + - [*allowed_collection_item, true] + - [*another_allowed_item, true] + - [*forbidden_collection_item, true] + + - name: Anonymous user with search endpoint + context: + req: + path: /search + method: POST + query_params: {} + path_params: {} + headers: {} + # No payload = anonymous + tests: + - [*public_item_1, true] + - [*public_item_no_flag, true] + - [*private_item_1, false] + - [*private_item_2, false] + + - name: User with empty collections claim + context: + req: + path: /search + method: POST + query_params: {} + path_params: {} + headers: + authorization: Bearer token + payload: + sub: user-no-collections + collections: [] # Empty collections list + superuser: "false" + tests: + # Should only see public items + - [*public_item_1, true] + - [*public_item_no_flag, true] + - [*private_item_1, false] + - [*private_item_2, false] + - [*allowed_collection_item, false] + - [*another_allowed_item, false] + + - name: Edge case - Item with missing collection field + context: *authenticated_user_context + tests: + - id: no-collection-item + type: Feature + properties: {} + geometry: null + - false # Should not match any filter that checks collection diff --git a/tests/test_cli_test_auth_rules.py b/tests/test_cli_test_auth_rules.py new file mode 100644 index 00000000..5e7f76a2 --- /dev/null +++ b/tests/test_cli_test_auth_rules.py @@ -0,0 +1,705 @@ +"""Functional tests for the CLI auth rules testing tool.""" + +import json +import sys +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest +import yaml + +from stac_auth_proxy.cli.test_auth_rules import ( + Colors, + load_filter_class, + load_test_file, + main, + print_colored, + run_test_case, + run_tests, +) + + +class TestLoadFilterClass: + """Tests for load_filter_class function.""" + + def test_load_template_filter_no_args(self): + """Test loading Template filter without arguments.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["(properties.private = false)"], + {}, + ) + assert filter_instance is not None + assert filter_instance.template_str == "(properties.private = false)" + + def test_load_template_filter_with_args(self): + """Test loading Template filter with arguments.""" + template = "{{ payload.sub }}" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + [template], + {}, + ) + assert filter_instance.template_str == template + + def test_load_filter_invalid_path_format(self): + """Test loading filter with invalid path format (missing colon).""" + with pytest.raises(ValueError, match="Invalid class path format"): + load_filter_class("stac_auth_proxy.filters.Template", [], {}) + + def test_load_filter_nonexistent_module(self): + """Test loading filter from non-existent module.""" + with pytest.raises(ImportError): + load_filter_class("nonexistent_module:Template", [], {}) + + def test_load_filter_nonexistent_class(self): + """Test loading non-existent class from valid module.""" + with pytest.raises(AttributeError): + load_filter_class("stac_auth_proxy.filters:NonExistentClass", [], {}) + + +class TestLoadTestFile: + """Tests for load_test_file function.""" + + def test_load_yaml_file(self, tmp_path): + """Test loading valid YAML test file.""" + test_data = { + "test_cases": [ + { + "name": "Test case 1", + "context": {"req": {}, "payload": None}, + "tests": [[{"id": "item1"}, True]], + } + ] + } + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml.dump(test_data)) + + result = load_test_file(yaml_file) + assert result == test_data + + def test_load_yml_file(self, tmp_path): + """Test loading valid .yml test file.""" + test_data = {"test_cases": []} + yml_file = tmp_path / "test.yml" + yml_file.write_text(yaml.dump(test_data)) + + result = load_test_file(yml_file) + assert result == test_data + + def test_load_json_file(self, tmp_path): + """Test loading valid JSON test file.""" + test_data = { + "test_cases": [ + { + "name": "Test case 1", + "context": {"req": {}, "payload": None}, + "tests": [[{"id": "item1"}, True]], + } + ] + } + json_file = tmp_path / "test.json" + json_file.write_text(json.dumps(test_data)) + + result = load_test_file(json_file) + assert result == test_data + + def test_load_nonexistent_file(self, tmp_path): + """Test loading non-existent file raises FileNotFoundError.""" + nonexistent = tmp_path / "nonexistent.yaml" + with pytest.raises(FileNotFoundError): + load_test_file(nonexistent) + + def test_load_invalid_yaml(self, tmp_path): + """Test loading invalid YAML raises error.""" + invalid_yaml = tmp_path / "invalid.yaml" + invalid_yaml.write_text("invalid: yaml: content: [") + + with pytest.raises(yaml.YAMLError): + load_test_file(invalid_yaml) + + def test_load_invalid_json(self, tmp_path): + """Test loading invalid JSON raises error.""" + invalid_json = tmp_path / "invalid.json" + invalid_json.write_text("{invalid json") + + with pytest.raises(json.JSONDecodeError): + load_test_file(invalid_json) + + +class TestRunTestCase: + """Tests for run_test_case function.""" + + @pytest.mark.asyncio + async def test_simple_passing_test(self): + """Test a simple passing test case.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["(properties.private = false)"], + {}, + ) + + test_case = { + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, True], + [{"id": "item2", "properties": {"private": True}}, False], + ], + } + + success, message = await run_test_case(test_case, filter_instance, 1) + assert success is True + assert "All 2 items matched expected results" in message + + @pytest.mark.asyncio + async def test_failing_test_mismatch(self): + """Test a test case with mismatched expectations.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["(properties.private = false)"], + {}, + ) + + test_case = { + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, False], # Wrong! + ], + } + + success, message = await run_test_case(test_case, filter_instance, 1) + assert success is False + assert "Item match failures" in message + assert "item1" in message + + @pytest.mark.asyncio + async def test_invalid_cql2_expression(self): + """Test handling of invalid CQL2 filter generation.""" + # Create a mock filter that returns invalid CQL2 + mock_filter = AsyncMock(return_value="invalid cql2 (((") + + test_case = { + "context": {"req": {}, "payload": None}, + "tests": [[{"id": "item1"}, True]], + } + + success, message = await run_test_case(test_case, mock_filter, 1) + assert success is False + assert "Failed to generate or validate CQL2 filter" in message + + @pytest.mark.asyncio + async def test_templated_filter_with_context(self): + """Test templated filter using context variables.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["{{ '(properties.private = false)' if payload is none else 'true' }}"], + {}, + ) + + # Test with no payload (anonymous) + test_case_anon = { + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, True], + [{"id": "item2", "properties": {"private": True}}, False], + ], + } + + success, message = await run_test_case(test_case_anon, filter_instance, 1) + assert success is True + + # Test with payload (authenticated) + test_case_auth = { + "context": {"req": {}, "payload": {"sub": "user123"}}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, True], + [{"id": "item2", "properties": {"private": True}}, True], + ], + } + + success, message = await run_test_case(test_case_auth, filter_instance, 2) + assert success is True + + @pytest.mark.asyncio + async def test_cql2_json_format(self): + """Test filter that returns CQL2 JSON format.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ['{"op": "=", "args": [{"property": "private"}, false]}'], + {}, + ) + + test_case = { + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "private": False}, True], + [{"id": "item2", "private": True}, False], + ], + } + + success, message = await run_test_case(test_case, filter_instance, 1) + assert success is True + + +class TestRunTests: + """Tests for run_tests function.""" + + @pytest.mark.asyncio + async def test_run_multiple_test_cases(self): + """Test running multiple test cases.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["(properties.private = false)"], + {}, + ) + + test_data = { + "test_cases": [ + { + "name": "Public items only", + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, True], + ], + }, + { + "name": "Private items excluded", + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item2", "properties": {"private": True}}, False], + ], + }, + ] + } + + passed, failed = await run_tests(filter_instance, test_data) + assert passed == 2 + assert failed == 0 + + @pytest.mark.asyncio + async def test_run_with_failures(self): + """Test running tests with some failures.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["(properties.private = false)"], + {}, + ) + + test_data = { + "test_cases": [ + { + "name": "Passing test", + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, True], + ], + }, + { + "name": "Failing test", + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item2", "properties": {"private": False}}, False], + ], + }, + ] + } + + passed, failed = await run_tests(filter_instance, test_data) + assert passed == 1 + assert failed == 1 + + @pytest.mark.asyncio + async def test_run_empty_test_cases(self): + """Test running with no test cases.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["true"], + {}, + ) + + test_data = {"test_cases": []} + + passed, failed = await run_tests(filter_instance, test_data) + assert passed == 0 + assert failed == 0 + + @pytest.mark.asyncio + async def test_run_missing_test_cases_key(self): + """Test running with missing test_cases key.""" + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + ["true"], + {}, + ) + + test_data = {} + + passed, failed = await run_tests(filter_instance, test_data) + assert passed == 0 + assert failed == 0 + + +class TestPrintColored: + """Tests for print_colored function.""" + + def test_print_colored_output(self, capsys): + """Test colored output formatting.""" + print_colored("Test message", Colors.GREEN) + captured = capsys.readouterr() + assert f"{Colors.GREEN}Test message{Colors.RESET}" in captured.out + + def test_print_colored_different_colors(self, capsys): + """Test different color codes.""" + print_colored("Red text", Colors.RED) + captured = capsys.readouterr() + assert Colors.RED in captured.out + + print_colored("Blue text", Colors.BLUE) + captured = capsys.readouterr() + assert Colors.BLUE in captured.out + + +class TestMainCLI: + """Tests for main CLI entry point.""" + + def test_main_success(self, tmp_path): + """Test successful CLI execution.""" + # Create a simple test file + test_file = tmp_path / "test.yaml" + test_data = { + "test_cases": [ + { + "name": "Simple test", + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, True], + ], + } + ] + } + test_file.write_text(yaml.dump(test_data)) + + # Mock sys.argv + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '["(properties.private = false)"]', + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 0 + + def test_main_with_failures(self, tmp_path): + """Test CLI execution with test failures.""" + test_file = tmp_path / "test.yaml" + test_data = { + "test_cases": [ + { + "name": "Failing test", + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, False], + ], + } + ] + } + test_file.write_text(yaml.dump(test_data)) + + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '["(properties.private = false)"]', + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_no_tests(self, tmp_path): + """Test CLI with no test cases.""" + test_file = tmp_path / "test.yaml" + test_data = {"test_cases": []} + test_file.write_text(yaml.dump(test_data)) + + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '["true"]', + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_invalid_filter_args(self, tmp_path): + """Test CLI with invalid filter args JSON.""" + test_file = tmp_path / "test.yaml" + test_file.write_text("test_cases: []") + + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + "{invalid json}", + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_filter_args_not_array(self, tmp_path): + """Test CLI with filter args that is not an array.""" + test_file = tmp_path / "test.yaml" + test_file.write_text("test_cases: []") + + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '{"not": "an array"}', + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_invalid_filter_kwargs(self, tmp_path): + """Test CLI with invalid filter kwargs JSON.""" + test_file = tmp_path / "test.yaml" + test_file.write_text("test_cases: []") + + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '["arg"]', + "--filter-kwargs", + "{invalid json}", + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_filter_kwargs_not_object(self, tmp_path): + """Test CLI with filter kwargs that is not an object.""" + test_file = tmp_path / "test.yaml" + test_file.write_text("test_cases: []") + + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '["arg"]', + "--filter-kwargs", + '["not", "an", "object"]', + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_invalid_filter_class_path(self, tmp_path): + """Test CLI with invalid filter class path.""" + test_file = tmp_path / "test.yaml" + test_file.write_text("test_cases: []") + + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "invalid.path.without.colon", + "--filter-args", + "[]", + "--test-file", + str(test_file), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_nonexistent_test_file(self, tmp_path): + """Test CLI with non-existent test file.""" + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '["true"]', + "--test-file", + str(tmp_path / "nonexistent.yaml"), + ], + ): + exit_code = main() + + assert exit_code == 1 + + def test_main_with_kwargs(self, tmp_path): + """Test CLI with filter kwargs.""" + test_file = tmp_path / "test.yaml" + test_data = { + "test_cases": [ + { + "name": "Simple test", + "context": {"req": {}, "payload": None}, + "tests": [ + [{"id": "item1", "properties": {"private": False}}, True], + ], + } + ] + } + test_file.write_text(yaml.dump(test_data)) + + # Note: Template doesn't actually use kwargs, but we test the CLI handles them + with patch.object( + sys, + "argv", + [ + "stac-auth-tests", + "--filter-class", + "stac_auth_proxy.filters:Template", + "--filter-args", + '["(properties.private = false)"]', + "--filter-kwargs", + '{"some_kwarg": "value"}', + "--test-file", + str(test_file), + ], + ): + # Should not error even with unexpected kwargs + exit_code = main() + + # Template doesn't support kwargs, so this will raise TypeError + assert exit_code == 1 + + +class TestIntegrationWithExampleFile: + """Integration tests using the example auth rules file.""" + + def test_example_file_exists(self): + """Verify the example auth rules file exists.""" + example_file = Path(__file__).parent / "example_auth_rules.yaml" + if example_file.exists(): + # Load and validate it's valid YAML + data = load_test_file(example_file) + assert "test_cases" in data + assert len(data["test_cases"]) > 0 + + @pytest.mark.asyncio + async def test_run_against_example_template(self): + """Test running a simple template against structured test data.""" + # Create a filter that allows public items or items in allowed collections + filter_instance = load_filter_class( + "stac_auth_proxy.filters:Template", + [ + """ + {% if payload is none %} + (properties.private = false) + {% else %} + true + {% endif %} + """ + ], + {}, + ) + + test_data = { + "test_cases": [ + { + "name": "Anonymous user sees only public items", + "context": {"req": {}, "payload": None}, + "tests": [ + [ + { + "id": "public-item", + "properties": {"private": False}, + }, + True, + ], + [ + { + "id": "private-item", + "properties": {"private": True}, + }, + False, + ], + ], + }, + { + "name": "Authenticated user sees all items", + "context": {"req": {}, "payload": {"sub": "user123"}}, + "tests": [ + [ + { + "id": "public-item", + "properties": {"private": False}, + }, + True, + ], + [ + { + "id": "private-item", + "properties": {"private": True}, + }, + True, + ], + ], + }, + ] + } + + passed, failed = await run_tests(filter_instance, test_data) + assert passed == 2 + assert failed == 0 diff --git a/uv.lock b/uv.lock index a0c6b222..6510f8fe 100644 --- a/uv.lock +++ b/uv.lock @@ -1578,6 +1578,7 @@ dependencies = [ { name = "jinja2" }, { name = "pydantic-settings" }, { name = "pyjwt" }, + { name = "pyyaml" }, { name = "starlette-cramjam" }, { name = "uvicorn" }, ] @@ -1608,6 +1609,7 @@ dev = [ { name = "ruff" }, { name = "starlette-cramjam" }, { name = "types-attrs" }, + { name = "types-pyyaml" }, { name = "types-simplejson" }, ] @@ -1630,6 +1632,7 @@ requires-dist = [ { name = "mkdocstrings", extras = ["python"], marker = "extra == 'docs'", specifier = ">=0.30.0" }, { name = "pydantic-settings", specifier = ">=2.6.1" }, { name = "pyjwt", specifier = ">=2.10.1" }, + { name = "pyyaml", specifier = ">=6.0" }, { name = "starlette-cramjam", specifier = ">=0.4.0" }, { name = "uvicorn", specifier = ">=0.32.1" }, ] @@ -1647,6 +1650,7 @@ dev = [ { name = "ruff", specifier = ">=0.0.238" }, { name = "starlette-cramjam", specifier = ">=0.4.0" }, { name = "types-attrs" }, + { name = "types-pyyaml" }, { name = "types-simplejson" }, ] @@ -1734,6 +1738,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fc/c9/1ac000a4eeee5f9dcca7f97f5792d4866e3f08489b88c2dbc5f19e855358/types_attrs-19.1.0-py2.py3-none-any.whl", hash = "sha256:d11acf7a2531a7c52a740c30fa3eb8d01d3066c10d34c01ff5e59502caac5352", size = 6079, upload-time = "2021-06-06T15:28:56.12Z" }, ] +[[package]] +name = "types-pyyaml" +version = "6.0.12.20250915" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7e/69/3c51b36d04da19b92f9e815be12753125bd8bc247ba0470a982e6979e71c/types_pyyaml-6.0.12.20250915.tar.gz", hash = "sha256:0f8b54a528c303f0e6f7165687dd33fafa81c807fcac23f632b63aa624ced1d3", size = 17522, upload-time = "2025-09-15T03:01:00.728Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/e0/1eed384f02555dde685fff1a1ac805c1c7dcb6dd019c916fe659b1c1f9ec/types_pyyaml-6.0.12.20250915-py3-none-any.whl", hash = "sha256:e7d4d9e064e89a3b3cae120b4990cd370874d2bf12fa5f46c97018dd5d3c9ab6", size = 20338, upload-time = "2025-09-15T03:00:59.218Z" }, +] + [[package]] name = "types-simplejson" version = "3.20.0.20250822"