From 74562d1627ea331f21a1ba5405daed9bfdbde5f4 Mon Sep 17 00:00:00 2001 From: geruh Date: Sat, 20 Dec 2025 00:57:21 -0800 Subject: [PATCH] feat: Allow servers to express supported endpoints with ConfigResponse --- pyiceberg/catalog/rest/__init__.py | 169 +++++++++++++++++++- tests/catalog/test_rest.py | 239 +++++++++++++++++++++++++++-- 2 files changed, 394 insertions(+), 14 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index a28ff562bd..f9a8d0a6fd 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -21,7 +21,7 @@ Union, ) -from pydantic import Field, field_validator +from pydantic import ConfigDict, Field, field_validator from requests import HTTPError, Session from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt @@ -76,6 +76,43 @@ import pyarrow as pa +class HttpMethod(str, Enum): + GET = "GET" + HEAD = "HEAD" + POST = "POST" + DELETE = "DELETE" + + +class Endpoint(IcebergBaseModel): + model_config = ConfigDict(frozen=True) + + http_method: HttpMethod = Field() + path: str = Field() + + @field_validator("path", mode="before") + @classmethod + def _validate_path(cls, raw_path: str) -> str: + if not raw_path: + raise ValueError("Invalid path: empty") + raw_path = raw_path.strip() + if not raw_path: + raise ValueError("Invalid path: empty") + return raw_path + + def __str__(self) -> str: + """Return the string representation of the Endpoint class.""" + return f"{self.http_method.value} {self.path}" + + @classmethod + def from_string(cls, endpoint: str | None) -> "Endpoint": + if endpoint is None: + raise ValueError("Invalid endpoint (must consist of 'METHOD /path'): None") + elements = endpoint.split(None, 1) + if len(elements) != 2: + raise ValueError(f"Invalid endpoint (must consist of two elements separated by a single space): {endpoint}") + return cls(http_method=HttpMethod(elements[0].upper()), path=elements[1]) + + class Endpoints: get_config: str = "config" list_namespaces: str = "namespaces" @@ -86,7 +123,7 @@ class Endpoints: namespace_exists: str = "namespaces/{namespace}" list_tables: str = "namespaces/{namespace}/tables" create_table: str = "namespaces/{namespace}/tables" - register_table = "namespaces/{namespace}/register" + register_table: str = "namespaces/{namespace}/register" load_table: str = "namespaces/{namespace}/tables/{table}" update_table: str = "namespaces/{namespace}/tables/{table}" drop_table: str = "namespaces/{namespace}/tables/{table}" @@ -100,6 +137,66 @@ class Endpoints: fetch_scan_tasks: str = "namespaces/{namespace}/tables/{table}/tasks" +class Capability: + V1_LIST_NAMESPACES = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces") + V1_LOAD_NAMESPACE = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}") + V1_NAMESPACE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path="/v1/{prefix}/namespaces/{namespace}") + V1_UPDATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/properties") + V1_CREATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces") + V1_DELETE_NAMESPACE = Endpoint(http_method=HttpMethod.DELETE, path="/v1/{prefix}/namespaces/{namespace}") + + V1_LIST_TABLES = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/tables") + V1_LOAD_TABLE = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}") + V1_TABLE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}") + V1_CREATE_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables") + V1_UPDATE_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}") + V1_DELETE_TABLE = Endpoint(http_method=HttpMethod.DELETE, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}") + V1_RENAME_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/tables/rename") + V1_REGISTER_TABLE = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/register") + + V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/views") + V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path="/v1/{prefix}/namespaces/{namespace}/views/{view}") + V1_VIEW_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path="/v1/{prefix}/namespaces/{namespace}/views/{view}") + V1_CREATE_VIEW = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/views") + V1_UPDATE_VIEW = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/views/{view}") + V1_DELETE_VIEW = Endpoint(http_method=HttpMethod.DELETE, path="/v1/{prefix}/namespaces/{namespace}/views/{view}") + V1_RENAME_VIEW = Endpoint(http_method=HttpMethod.POST, path="/v1/{prefix}/views/rename") + V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint( + http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan" + ) + V1_TABLE_SCAN_PLAN_TASKS = Endpoint( + http_method=HttpMethod.POST, path="/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks" + ) + + +# Default endpoints for backwards compatibility with legacy servers that don't return endpoints +# in ConfigResponse. Only includes namespace and table endpoints. +DEFAULT_ENDPOINTS: frozenset[Endpoint] = frozenset( + ( + Capability.V1_LIST_NAMESPACES, + Capability.V1_LOAD_NAMESPACE, + Capability.V1_CREATE_NAMESPACE, + Capability.V1_UPDATE_NAMESPACE, + Capability.V1_DELETE_NAMESPACE, + Capability.V1_LIST_TABLES, + Capability.V1_LOAD_TABLE, + Capability.V1_CREATE_TABLE, + Capability.V1_UPDATE_TABLE, + Capability.V1_DELETE_TABLE, + Capability.V1_RENAME_TABLE, + Capability.V1_REGISTER_TABLE, + ) +) + +# View endpoints conditionally added based on VIEW_ENDPOINTS_SUPPORTED property. +VIEW_ENDPOINTS: frozenset[Endpoint] = frozenset( + ( + Capability.V1_LIST_VIEWS, + Capability.V1_DELETE_VIEW, + ) +) + + class IdentifierKind(Enum): TABLE = "table" VIEW = "view" @@ -134,6 +231,8 @@ class IdentifierKind(Enum): CUSTOM = "custom" REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled" REST_SCAN_PLANNING_ENABLED_DEFAULT = False +VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported" +VIEW_ENDPOINTS_SUPPORTED_DEFAULT = False NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8) @@ -180,6 +279,14 @@ class RegisterTableRequest(IcebergBaseModel): class ConfigResponse(IcebergBaseModel): defaults: Properties | None = Field(default_factory=dict) overrides: Properties | None = Field(default_factory=dict) + endpoints: set[Endpoint] | None = Field(default=None) + + @field_validator("endpoints", mode="before") + @classmethod + def _parse_endpoints(cls, v: list[str] | None) -> set[Endpoint] | None: + if v is None: + return None + return {Endpoint.from_string(s) for s in v} class ListNamespaceResponse(IcebergBaseModel): @@ -218,6 +325,7 @@ class ListViewsResponse(IcebergBaseModel): class RestCatalog(Catalog): uri: str _session: Session + _supported_endpoints: set[Endpoint] def __init__(self, name: str, **properties: str): """Rest Catalog. @@ -279,7 +387,9 @@ def is_rest_scan_planning_enabled(self) -> bool: Returns: True if enabled, False otherwise. """ - return property_as_bool(self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT) + return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool( + self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT + ) def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager: """Create the LegacyOAuth2AuthManager by fetching required properties. @@ -327,6 +437,18 @@ def url(self, endpoint: str, prefixed: bool = True, **kwargs: Any) -> str: return url + endpoint.format(**kwargs) + def _check_endpoint(self, endpoint: Endpoint) -> None: + """Check if an endpoint is supported by the server. + + Args: + endpoint: The endpoint to check against the set of supported endpoints + + Raises: + NotImplementedError: If the endpoint is not supported. + """ + if endpoint not in self._supported_endpoints: + raise NotImplementedError(f"Server does not support endpoint: {endpoint}") + @property def auth_url(self) -> str: self._warn_oauth_tokens_deprecation() @@ -384,6 +506,17 @@ def _fetch_config(self) -> None: # Update URI based on overrides self.uri = config[URI] + # Determine supported endpoints + endpoints = config_response.endpoints + if endpoints: + self._supported_endpoints = set(endpoints) + else: + # Use default endpoints for legacy servers that don't return endpoints + self._supported_endpoints = set(DEFAULT_ENDPOINTS) + # Conditionally add view endpoints based on config + if property_as_bool(self.properties, VIEW_ENDPOINTS_SUPPORTED, VIEW_ENDPOINTS_SUPPORTED_DEFAULT): + self._supported_endpoints.update(VIEW_ENDPOINTS) + def _identifier_to_validated_tuple(self, identifier: str | Identifier) -> Identifier: identifier_tuple = self.identifier_to_tuple(identifier) if len(identifier_tuple) <= 1: @@ -503,6 +636,7 @@ def _create_table( properties: Properties = EMPTY_DICT, stage_create: bool = False, ) -> TableResponse: + self._check_endpoint(Capability.V1_CREATE_TABLE) iceberg_schema = self._convert_schema_if_needed( schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore @@ -591,6 +725,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - Raises: TableAlreadyExistsError: If the table already exists """ + self._check_endpoint(Capability.V1_REGISTER_TABLE) namespace_and_table = self._split_identifier_for_path(identifier) request = RegisterTableRequest( name=namespace_and_table["table"], @@ -611,6 +746,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - @retry(**_RETRY_ARGS) def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + self._check_endpoint(Capability.V1_LIST_TABLES) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat)) @@ -622,6 +758,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: @retry(**_RETRY_ARGS) def load_table(self, identifier: str | Identifier) -> Table: + self._check_endpoint(Capability.V1_LOAD_TABLE) params = {} if mode := self.properties.get(SNAPSHOT_LOADING_MODE): if mode in {"all", "refs"}: @@ -642,6 +779,7 @@ def load_table(self, identifier: str | Identifier) -> Table: @retry(**_RETRY_ARGS) def drop_table(self, identifier: str | Identifier, purge_requested: bool = False) -> None: + self._check_endpoint(Capability.V1_DELETE_TABLE) response = self._session.delete( self.url(Endpoints.drop_table, prefixed=True, **self._split_identifier_for_path(identifier)), params={"purgeRequested": purge_requested}, @@ -657,6 +795,7 @@ def purge_table(self, identifier: str | Identifier) -> None: @retry(**_RETRY_ARGS) def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table: + self._check_endpoint(Capability.V1_RENAME_TABLE) payload = { "source": self._split_identifier_for_json(from_identifier), "destination": self._split_identifier_for_json(to_identifier), @@ -692,6 +831,8 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm @retry(**_RETRY_ARGS) def list_views(self, namespace: str | Identifier) -> list[Identifier]: + if Capability.V1_LIST_VIEWS not in self._supported_endpoints: + return [] namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) @@ -720,6 +861,7 @@ def commit_table( CommitFailedException: Requirement not met, or a conflict with a concurrent commit. CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ + self._check_endpoint(Capability.V1_UPDATE_TABLE) identifier = table.name() table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1]) table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates) @@ -749,6 +891,7 @@ def commit_table( @retry(**_RETRY_ARGS) def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None: + self._check_endpoint(Capability.V1_CREATE_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} response = self._session.post(self.url(Endpoints.create_namespace), json=payload) @@ -759,6 +902,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties = @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: str | Identifier) -> None: + self._check_endpoint(Capability.V1_DELETE_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace)) @@ -769,6 +913,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: @retry(**_RETRY_ARGS) def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + self._check_endpoint(Capability.V1_LIST_NAMESPACES) namespace_tuple = self.identifier_to_tuple(namespace) response = self._session.get( self.url( @@ -786,6 +931,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: @retry(**_RETRY_ARGS) def load_namespace_properties(self, namespace: str | Identifier) -> Properties: + self._check_endpoint(Capability.V1_LOAD_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace)) @@ -800,6 +946,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties: def update_namespace_properties( self, namespace: str | Identifier, removals: set[str] | None = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: + self._check_endpoint(Capability.V1_UPDATE_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) payload = {"removals": list(removals or []), "updates": updates} @@ -819,6 +966,14 @@ def update_namespace_properties( def namespace_exists(self, namespace: str | Identifier) -> bool: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) + + if Capability.V1_NAMESPACE_EXISTS not in self._supported_endpoints: + try: + self.load_namespace_properties(namespace_tuple) + return True + except NoSuchNamespaceError: + return False + response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace)) if response.status_code == 404: @@ -843,6 +998,13 @@ def table_exists(self, identifier: str | Identifier) -> bool: Returns: bool: True if the table exists, False otherwise. """ + if Capability.V1_TABLE_EXISTS not in self._supported_endpoints: + try: + self.load_table(identifier) + return True + except NoSuchTableError: + return False + response = self._session.head( self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)) ) @@ -886,6 +1048,7 @@ def view_exists(self, identifier: str | Identifier) -> bool: @retry(**_RETRY_ARGS) def drop_view(self, identifier: str) -> None: + self._check_endpoint(Capability.V1_DELETE_VIEW) response = self._session.delete( self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)), ) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 464314f3be..03bef31070 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -27,7 +27,7 @@ import pyiceberg from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog -from pyiceberg.catalog.rest import OAUTH2_SERVER_URI, SNAPSHOT_LOADING_MODE, RestCatalog +from pyiceberg.catalog.rest import DEFAULT_ENDPOINTS, OAUTH2_SERVER_URI, SNAPSHOT_LOADING_MODE, Capability, RestCatalog from pyiceberg.exceptions import ( AuthorizationExpiredError, NamespaceAlreadyExistsError, @@ -457,7 +457,9 @@ def test_list_views_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")] + assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"view-endpoints-supported": "true"}).list_views(namespace) == [ + ("examples", "fooshare") + ] def test_list_views_200_sigv4(rest_mock: Mocker) -> None: @@ -469,9 +471,9 @@ def test_list_views_200_sigv4(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) == [ - ("examples", "fooshare") - ] + assert RestCatalog( + "rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true", "view-endpoints-supported": "true"} + ).list_views(namespace) == [("examples", "fooshare")] assert rest_mock.called @@ -490,7 +492,7 @@ def test_list_views_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) with pytest.raises(NoSuchNamespaceError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"view-endpoints-supported": "true"}).list_views(namespace) assert "Namespace does not exist" in str(e.value) @@ -502,7 +504,7 @@ def test_view_exists_204(rest_mock: Mocker) -> None: status_code=204, request_headers=TEST_HEADERS, ) - catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"view-endpoints-supported": "true"}) assert catalog.view_exists((namespace, view)) @@ -514,7 +516,7 @@ def test_view_exists_404(rest_mock: Mocker) -> None: status_code=404, request_headers=TEST_HEADERS, ) - catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"view-endpoints-supported": "true"}) assert not catalog.view_exists((namespace, view)) @@ -782,6 +784,7 @@ def test_namespace_exists_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + catalog._supported_endpoints.add(Capability.V1_NAMESPACE_EXISTS) assert catalog.namespace_exists("fokko") @@ -793,6 +796,7 @@ def test_namespace_exists_204(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + catalog._supported_endpoints.add(Capability.V1_NAMESPACE_EXISTS) assert catalog.namespace_exists("fokko") @@ -804,6 +808,7 @@ def test_namespace_exists_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + catalog._supported_endpoints.add(Capability.V1_NAMESPACE_EXISTS) assert not catalog.namespace_exists("fokko") @@ -815,6 +820,7 @@ def test_namespace_exists_500(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + catalog._supported_endpoints.add(Capability.V1_NAMESPACE_EXISTS) with pytest.raises(ServerError): catalog.namespace_exists("fokko") @@ -957,6 +963,15 @@ def test_load_table_404(rest_mock: Mocker) -> None: def test_table_exists_200(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_TABLE_EXISTS)], + }, + status_code=200, + ) rest_mock.head( f"{TEST_URI}v1/namespaces/fokko/tables/table", status_code=200, @@ -967,6 +982,15 @@ def test_table_exists_200(rest_mock: Mocker) -> None: def test_table_exists_204(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_TABLE_EXISTS)], + }, + status_code=200, + ) rest_mock.head( f"{TEST_URI}v1/namespaces/fokko/tables/table", status_code=204, @@ -977,6 +1001,15 @@ def test_table_exists_204(rest_mock: Mocker) -> None: def test_table_exists_404(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_TABLE_EXISTS)], + }, + status_code=200, + ) rest_mock.head( f"{TEST_URI}v1/namespaces/fokko/tables/table", status_code=404, @@ -987,6 +1020,15 @@ def test_table_exists_404(rest_mock: Mocker) -> None: def test_table_exists_500(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_TABLE_EXISTS)], + }, + status_code=200, + ) rest_mock.head( f"{TEST_URI}v1/namespaces/fokko/tables/table", status_code=500, @@ -1339,6 +1381,15 @@ def test_delete_table_from_self_identifier_204( def test_rename_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_NAMESPACE_EXISTS), str(Capability.V1_RENAME_TABLE), str(Capability.V1_LOAD_TABLE)], + }, + status_code=200, + ) rest_mock.post( f"{TEST_URI}v1/tables/rename", json={ @@ -1377,6 +1428,15 @@ def test_rename_table_200(rest_mock: Mocker, example_table_metadata_with_snapsho def test_rename_table_from_self_identifier_200( rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any] ) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_NAMESPACE_EXISTS), str(Capability.V1_RENAME_TABLE), str(Capability.V1_LOAD_TABLE)], + }, + status_code=200, + ) rest_mock.get( f"{TEST_URI}v1/namespaces/pdames/tables/source", json=example_table_metadata_with_snapshot_v1_rest_json, @@ -1420,6 +1480,15 @@ def test_rename_table_from_self_identifier_200( def test_rename_table_source_namespace_does_not_exist(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_NAMESPACE_EXISTS), str(Capability.V1_RENAME_TABLE), str(Capability.V1_LOAD_TABLE)], + }, + status_code=200, + ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) from_identifier = ("invalid", "source") to_identifier = ("pdames", "destination") @@ -1441,6 +1510,15 @@ def test_rename_table_source_namespace_does_not_exist(rest_mock: Mocker) -> None def test_rename_table_destination_namespace_does_not_exist(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_NAMESPACE_EXISTS), str(Capability.V1_RENAME_TABLE), str(Capability.V1_LOAD_TABLE)], + }, + status_code=200, + ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) from_identifier = ("pdames", "source") to_identifier = ("invalid", "destination") @@ -1824,6 +1902,15 @@ def test_table_identifier_in_commit_table_request( def test_drop_view_invalid_namespace(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_DELETE_VIEW)], + }, + status_code=200, + ) view = "view" with pytest.raises(NoSuchIdentifierError) as e: # Missing namespace @@ -1833,6 +1920,15 @@ def test_drop_view_invalid_namespace(rest_mock: Mocker) -> None: def test_drop_view_404(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_DELETE_VIEW)], + }, + status_code=200, + ) rest_mock.delete( f"{TEST_URI}v1/namespaces/some_namespace/views/does_not_exists", json={ @@ -1852,6 +1948,15 @@ def test_drop_view_404(rest_mock: Mocker) -> None: def test_drop_view_204(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(Capability.V1_DELETE_VIEW)], + }, + status_code=200, + ) rest_mock.delete( f"{TEST_URI}v1/namespaces/some_namespace/views/some_view", json={}, @@ -2007,7 +2112,11 @@ def test_rest_scan_planning_disabled_by_default(self, rest_mock: Mocker) -> None def test_rest_scan_planning_enabled_by_property(self, rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/config", - json={"defaults": {}, "overrides": {}}, + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"], + }, status_code=200, ) catalog = RestCatalog( @@ -2019,12 +2128,31 @@ def test_rest_scan_planning_enabled_by_property(self, rest_mock: Mocker) -> None assert catalog.is_rest_scan_planning_enabled() is True - def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) -> None: + def test_rest_scan_planning_disabled_without_endpoint_support(self, rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/config", json={"defaults": {}, "overrides": {}}, status_code=200, ) + catalog = RestCatalog( + "rest", + uri=TEST_URI, + token=TEST_TOKEN, + **{"rest-scan-planning-enabled": "true"}, + ) + + assert catalog.is_rest_scan_planning_enabled() is False + + def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"], + }, + status_code=200, + ) catalog = RestCatalog( "rest", uri=TEST_URI, @@ -2037,9 +2165,98 @@ def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) -> None def test_rest_scan_planning_enabled_from_server_config(self, rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/config", - json={"defaults": {"rest-scan-planning-enabled": "true"}, "overrides": {}}, + json={ + "defaults": {"rest-scan-planning-enabled": "true"}, + "overrides": {}, + "endpoints": ["POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"], + }, status_code=200, ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) assert catalog.is_rest_scan_planning_enabled() is True + + def test_supported_endpoint(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["GET /v1/{prefix}/namespaces", "GET /v1/{prefix}/namespaces/{namespace}/tables"], + }, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + # Should not raise since these endpoints are in the supported set + catalog._check_endpoint(Capability.V1_LIST_NAMESPACES) + catalog._check_endpoint(Capability.V1_LIST_TABLES) + + def test_unsupported_endpoint(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["GET /v1/{prefix}/namespaces"], + }, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + with pytest.raises(NotImplementedError, match="Server does not support endpoint"): + catalog._check_endpoint(Capability.V1_LIST_TABLES) + + def test_config_returns_invalid_endpoint(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["INVALID_ENDPOINT"], + }, + status_code=200, + ) + + with pytest.raises(ValueError, match="Invalid endpoint"): + RestCatalog("rest", uri=TEST_URI, token="token") + + def test_default_endpoints_used_when_none_returned(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={"defaults": {}, "overrides": {}}, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + # Should not raise for default endpoints + for endpoint in DEFAULT_ENDPOINTS: + catalog._check_endpoint(endpoint) + + def test_view_endpoints_not_included_by_default(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={"defaults": {}, "overrides": {}}, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + with pytest.raises(NotImplementedError, match="Server does not support endpoint"): + catalog._check_endpoint(Capability.V1_LIST_VIEWS) + + def test_view_endpoints_enabled_with_config(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={"defaults": {}, "overrides": {}}, + status_code=200, + ) + catalog = RestCatalog( + "rest", + uri=TEST_URI, + token="token", + **{"view-endpoints-supported": "true"}, + ) + + # View endpoints should be supported when enabled + catalog._check_endpoint(Capability.V1_LIST_VIEWS) + catalog._check_endpoint(Capability.V1_DELETE_VIEW)