From 203bbb4e3ee64914a2b7a9c4279c869863d9c95e Mon Sep 17 00:00:00 2001 From: Soham <010Soham@users.noreply.github.com> Date: Fri, 19 Dec 2025 11:49:10 +0000 Subject: [PATCH 1/2] Fix REST signer to use catalog auth manager --- pyiceberg/catalog/__init__.py | 1 + pyiceberg/catalog/rest/__init__.py | 25 ++++++++------- pyiceberg/io/fsspec.py | 16 ++++++++-- tests/io/test_fsspec.py | 51 ++++++++++++++++++++++++++++++ 4 files changed, 79 insertions(+), 14 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index a4f1d47bea..99e0bdce88 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -77,6 +77,7 @@ _ENV_CONFIG = Config() TOKEN = "token" +AUTH_MANAGER = "auth.manager" TYPE = "type" PY_CATALOG_IMPL = "py-catalog-impl" ICEBERG = "iceberg" diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 3b77fd47f0..e72f247ada 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -26,14 +26,7 @@ from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt from pyiceberg import __version__ -from pyiceberg.catalog import ( - BOTOCORE_SESSION, - TOKEN, - URI, - WAREHOUSE_LOCATION, - Catalog, - PropertiesUpdateSummary, -) +from pyiceberg.catalog import AUTH_MANAGER, BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager from pyiceberg.catalog.rest.response import _handle_non_200_response from pyiceberg.exceptions import ( @@ -49,7 +42,7 @@ TableAlreadyExistsError, UnauthorizedError, ) -from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, FileIO, load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids from pyiceberg.schema import Schema, assign_fresh_schema_ids from pyiceberg.table import ( @@ -214,6 +207,7 @@ class ListViewsResponse(IcebergBaseModel): class RestCatalog(Catalog): uri: str _session: Session + _auth_manager: AuthManager | None def __init__(self, name: str, **properties: str): """Rest Catalog. @@ -225,6 +219,7 @@ def __init__(self, name: str, **properties: str): properties: Properties that are passed along to the configuration. """ super().__init__(name, **properties) + self._auth_manager: AuthManager | None = None self.uri = properties[URI] self._fetch_config() self._session = self._create_session() @@ -259,9 +254,11 @@ def _create_session(self) -> Session: if auth_type != CUSTOM and auth_impl: raise ValueError("auth.impl can only be specified when using custom auth.type") - session.auth = AuthManagerAdapter(AuthManagerFactory.create(auth_impl or auth_type, auth_type_config)) + self._auth_manager = AuthManagerFactory.create(auth_impl or auth_type, auth_type_config) + session.auth = AuthManagerAdapter(self._auth_manager) else: - session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session)) + self._auth_manager = self._create_legacy_oauth2_auth_manager(session) + session.auth = AuthManagerAdapter(self._auth_manager) # Configure SigV4 Request Signing if property_as_bool(self.properties, SIGV4, False): @@ -269,6 +266,12 @@ def _create_session(self) -> Session: return session + def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO: + merged_properties = {**self.properties, **properties} + if self._auth_manager: + merged_properties[AUTH_MANAGER] = self._auth_manager + return load_file_io(merged_properties, location) + def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager: """Create the LegacyOAuth2AuthManager by fetching required properties. diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 5898a22675..bb640908dc 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -36,7 +36,7 @@ from fsspec.implementations.local import LocalFileSystem from requests import HTTPError -from pyiceberg.catalog import TOKEN, URI +from pyiceberg.catalog import AUTH_MANAGER, TOKEN, URI from pyiceberg.exceptions import SignError from pyiceberg.io import ( ADLS_ACCOUNT_HOST, @@ -121,9 +121,19 @@ def __call__(self, request: "AWSRequest", **_: Any) -> None: signer_url = self.properties.get(S3_SIGNER_URI, self.properties[URI]).rstrip("/") # type: ignore signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT) - signer_headers = {} + signer_headers: dict[str, str] = {} + + auth_header: str | None = None if token := self.properties.get(TOKEN): - signer_headers = {"Authorization": f"Bearer {token}"} + auth_header = f"Bearer {token}" + elif auth_manager := self.properties.get(AUTH_MANAGER): + header = getattr(auth_manager, "auth_header", None) + if callable(header): + auth_header = header() + + if auth_header: + signer_headers["Authorization"] = auth_header + signer_headers.update(get_header_properties(self.properties)) signer_body = { diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py index c28eb0714f..21e2846ceb 100644 --- a/tests/io/test_fsspec.py +++ b/tests/io/test_fsspec.py @@ -28,6 +28,7 @@ from fsspec.spec import AbstractFileSystem from requests_mock import Mocker +from pyiceberg.catalog import AUTH_MANAGER from pyiceberg.exceptions import SignError from pyiceberg.io import fsspec from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner @@ -948,3 +949,53 @@ def test_s3v4_rest_signer_forbidden(requests_mock: Mocker) -> None: """Failed to sign request 401: {'method': 'HEAD', 'region': 'us-west-2', 'uri': 'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro', 'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}""" in str(exc_info.value) ) + + +def test_s3v4_rest_signer_uses_auth_manager(requests_mock: Mocker) -> None: + new_uri = "https://bucket/metadata/snap-signed.avro" + requests_mock.post( + f"{TEST_URI}/v1/aws/s3/sign", + json={ + "uri": new_uri, + "headers": { + "Authorization": ["AWS4-HMAC-SHA256 Credential=ASIA.../s3/aws4_request, SignedHeaders=host, Signature=abc"], + "Host": ["bucket.s3.us-west-2.amazonaws.com"], + }, + "extensions": {}, + }, + status_code=200, + ) + + request = AWSRequest( + method="HEAD", + url="https://bucket/metadata/snap-foo.avro", + headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"}, + data=b"", + params={}, + auth_path="/metadata/snap-foo.avro", + ) + request.context = { + "client_region": "us-west-2", + "has_streaming_input": False, + "auth_type": None, + "signing": {"bucket": "bucket"}, + "retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"}, + } + + class DummyAuthManager: + def __init__(self) -> None: + self.calls = 0 + + def auth_header(self) -> str: + self.calls += 1 + return "Bearer via-manager" + + auth_manager = DummyAuthManager() + + signer = S3V4RestSigner(properties={AUTH_MANAGER: auth_manager, "uri": TEST_URI}) + signer(request) + + assert auth_manager.calls == 1 + assert requests_mock.last_request is not None + assert requests_mock.last_request.headers["Authorization"] == "Bearer via-manager" + assert request.url == new_uri From a99dcada431a06329959cb7f71bf961343256bb0 Mon Sep 17 00:00:00 2001 From: Soham <010Soham@users.noreply.github.com> Date: Sun, 21 Dec 2025 08:35:46 +0000 Subject: [PATCH 2/2] Apply ruff format after merge --- pyiceberg/catalog/rest/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 29827941d0..b65adb6af1 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -275,6 +275,7 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | Non if self._auth_manager: merged_properties[AUTH_MANAGER] = self._auth_manager return load_file_io(merged_properties, location) + def is_rest_scan_planning_enabled(self) -> bool: """Check if rest server-side scan planning is enabled.