diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index 79617b2982ff..eb80158b0cab 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.rest; +import java.util.Arrays; +import java.util.stream.Collectors; + public final class RESTCatalogProperties { private RESTCatalogProperties() {} @@ -37,12 +40,107 @@ private RESTCatalogProperties() {} public static final String NAMESPACE_SEPARATOR = "namespace-separator"; - // Enable planning on the REST server side - public static final String REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"; - public static final boolean REST_SCAN_PLANNING_ENABLED_DEFAULT = false; + // Configure scan planning mode + // Can be set by server in LoadTableResponse.config() or by client in catalog properties + // Negotiation rules: ONLY beats PREFERRED, both PREFERRED = client wins + // Default when neither client nor server provides: client-preferred + public static final String SCAN_PLANNING_MODE = "scan-planning-mode"; + public static final String SCAN_PLANNING_MODE_DEFAULT = + ScanPlanningMode.CLIENT_PREFERRED.modeName(); public enum SnapshotMode { ALL, REFS } + + /** + * Enum to represent scan planning mode configuration. + * + *

Can be configured by: + * + *

+ * + *

When both client and server configure this property, the values are negotiated: + * + *

Values: + * + *

+ * + *

Negotiation rules when both sides are configured: + * + *

+ */ + public enum ScanPlanningMode { + CLIENT_ONLY("client-only"), + CLIENT_PREFERRED("client-preferred"), + CATALOG_PREFERRED("catalog-preferred"), + CATALOG_ONLY("catalog-only"); + + private final String modeName; + + ScanPlanningMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public boolean isClientOnly() { + return this == CLIENT_ONLY; + } + + public boolean isCatalogOnly() { + return this == CATALOG_ONLY; + } + + public boolean isOnly() { + return this == CLIENT_ONLY || this == CATALOG_ONLY; + } + + public boolean isPreferred() { + return this == CLIENT_PREFERRED || this == CATALOG_PREFERRED; + } + + public boolean prefersClient() { + return this == CLIENT_ONLY || this == CLIENT_PREFERRED; + } + + public boolean prefersCatalog() { + return this == CATALOG_ONLY || this == CATALOG_PREFERRED; + } + + public static ScanPlanningMode fromString(String mode) { + if (mode == null) { + return CLIENT_PREFERRED; + } + for (ScanPlanningMode planningMode : values()) { + if (planningMode.modeName.equalsIgnoreCase(mode)) { + return planningMode; + } + } + String validModes = + Arrays.stream(values()).map(ScanPlanningMode::modeName).collect(Collectors.joining(", ")); + throw new IllegalArgumentException( + String.format("Invalid scan planning mode: %s. Valid values are: %s", mode, validModes)); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 61e25d3d4fc6..f3eb746f193a 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -159,7 +159,8 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; private Integer pageSize = null; - private boolean restScanPlanningEnabled; + private RESTCatalogProperties.ScanPlanningMode clientConfiguredScanPlanningMode; + private RESTCatalogProperties.ScanPlanningMode catalogLevelScanPlanningMode; private CloseableGroup closeables = null; private Set endpoints; private Supplier> mutationHeaders = Map::of; @@ -272,11 +273,24 @@ public void initialize(String name, Map unresolved) { RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTUtil.NAMESPACE_SEPARATOR_URLENCODED_UTF_8); - this.restScanPlanningEnabled = - PropertyUtil.propertyAsBoolean( - mergedProps, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT); + // Client-configured scan planning mode (from catalog properties, not from server config) + // Read from un-merged properties to avoid picking up server-provided defaults + String clientScanPlanningConfig = + PropertyUtil.propertyAsString(props, RESTCatalogProperties.SCAN_PLANNING_MODE, null); + this.clientConfiguredScanPlanningMode = + clientScanPlanningConfig != null + ? RESTCatalogProperties.ScanPlanningMode.fromString(clientScanPlanningConfig) + : null; + + // Also store the catalog-level (possibly server-provided) scan planning mode as a fallback + // This comes from ConfigResponse.overrides() and gets merged into mergedProps + String catalogLevelConfig = + PropertyUtil.propertyAsString(mergedProps, RESTCatalogProperties.SCAN_PLANNING_MODE, null); + this.catalogLevelScanPlanningMode = + catalogLevelConfig != null + ? RESTCatalogProperties.ScanPlanningMode.fromString(catalogLevelConfig) + : null; + super.initialize(name, mergedProps); } @@ -486,7 +500,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { // RestTable should only be returned for non-metadata tables, because client would // not have access to metadata files for example manifests, since all it needs is catalog. if (metadataType == null) { - RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier, tableClient, tableConf); if (restTable != null) { return restTable; } @@ -505,9 +519,35 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { } private RESTTable restTableForScanPlanning( - TableOperations ops, TableIdentifier finalIdentifier, RESTClient restClient) { - // server supports remote planning endpoint and server / client wants to do server side planning - if (endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN) && restScanPlanningEnabled) { + TableOperations ops, + TableIdentifier finalIdentifier, + RESTClient restClient, + Map tableConf) { + // Get client-configured mode (set in catalog properties during initialization) + RESTCatalogProperties.ScanPlanningMode clientMode = clientConfiguredScanPlanningMode; + + // Get server-provided mode + // Priority: table-level config > catalog-level config (from ConfigResponse) + String tableLevelModeConfig = tableConf.get(RESTCatalogProperties.SCAN_PLANNING_MODE); + RESTCatalogProperties.ScanPlanningMode serverMode; + if (tableLevelModeConfig != null) { + serverMode = RESTCatalogProperties.ScanPlanningMode.fromString(tableLevelModeConfig); + } else { + // Fall back to catalog-level server config (from ConfigResponse.overrides()) + serverMode = catalogLevelScanPlanningMode; + } + + // Check server capabilities + boolean serverSupportsPlanning = endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN); + + // Negotiate scan planning strategy + // Rules: ONLY beats PREFERRED, both PREFERRED = client wins, one side only = use it + ScanPlanningNegotiator.PlanningDecision decision = + ScanPlanningNegotiator.negotiate( + clientMode, serverMode, serverSupportsPlanning, finalIdentifier); + + // Apply the decision + if (decision == ScanPlanningNegotiator.PlanningDecision.USE_CATALOG_PLANNING) { return new RESTTable( ops, fullTableName(finalIdentifier), @@ -518,6 +558,8 @@ private RESTTable restTableForScanPlanning( paths, endpoints); } + + // USE_CLIENT_PLANNING return null; } @@ -589,7 +631,7 @@ public Table registerTable( trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } @@ -858,7 +900,7 @@ public Table create() { trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } diff --git a/core/src/main/java/org/apache/iceberg/rest/ScanPlanningNegotiator.java b/core/src/main/java/org/apache/iceberg/rest/ScanPlanningNegotiator.java new file mode 100644 index 000000000000..0c07fcc8489f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/ScanPlanningNegotiator.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTCatalogProperties.ScanPlanningMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles negotiation between client and server scan planning mode configurations. + * + *

This class encapsulates the decision logic for determining whether to use client-side or + * server-side scan planning based on: + * + *

    + *
  • Client mode (CLIENT_ONLY, CLIENT_PREFERRED, CATALOG_PREFERRED, CATALOG_ONLY, or null) + *
  • Server mode (CLIENT_ONLY, CLIENT_PREFERRED, CATALOG_PREFERRED, CATALOG_ONLY, or null) + *
  • Server capabilities (supports planning endpoint or not) + *
+ * + *

Negotiation Rules + * + *

When both client and server configure scan planning mode, the values are negotiated: + * + *

    + *
  • Incompatible hard requirements: CLIENT_ONLY + CATALOG_ONLY = FAIL + *
  • ONLY wins over PREFERRED: When one side has "ONLY" and the other has "PREFERRED", + * the ONLY requirement wins (inflexible beats flexible) + *
  • Both PREFERRED: When both are PREFERRED (different types), client config wins + *
  • Both same: When both have the same value, use that planning type + *
  • Only one configured: Use the configured side (client or server) + *
  • Neither configured: Use default (CLIENT_PREFERRED) + *
+ */ +public class ScanPlanningNegotiator { + private static final Logger LOG = LoggerFactory.getLogger(ScanPlanningNegotiator.class); + + private ScanPlanningNegotiator() {} + + /** Result of scan planning negotiation. */ + public enum PlanningDecision { + USE_CLIENT_PLANNING, + USE_CATALOG_PLANNING + } + + /** + * Negotiates scan planning strategy between client and server configurations. + * + *

Precedence: Client config > Server config > Default (CLIENT_PREFERRED) + * + * @param clientMode the client's scan planning mode (from catalog properties), may be null + * @param serverMode the server's scan planning mode (from LoadTableResponse.config()), may be + * null + * @param serverSupportsPlanning whether the server advertises the scan planning endpoint + * @param tableIdentifier the table identifier (for error messages and logging) + * @return the negotiated planning decision + * @throws IllegalStateException if client and server requirements are incompatible (CLIENT_ONLY + * vs CATALOG_ONLY) + * @throws UnsupportedOperationException if required server-side planning but server doesn't + * support it + */ + public static PlanningDecision negotiate( + ScanPlanningMode clientMode, + ScanPlanningMode serverMode, + boolean serverSupportsPlanning, + TableIdentifier tableIdentifier) { + + // Determine effective mode through negotiation + ScanPlanningMode effectiveMode; + String modeSource; + + if (clientMode != null && serverMode != null) { + // Both client and server have configured modes - negotiate + effectiveMode = negotiateBetweenClientAndServer(clientMode, serverMode, tableIdentifier); + modeSource = + String.format( + "negotiated (client: %s, server: %s)", clientMode.modeName(), serverMode.modeName()); + } else if (clientMode != null) { + // Only client configured + effectiveMode = clientMode; + modeSource = "client config"; + } else if (serverMode != null) { + // Only server configured + effectiveMode = serverMode; + modeSource = "server config"; + } else { + // Neither configured, use default + effectiveMode = ScanPlanningMode.CLIENT_PREFERRED; + modeSource = "default"; + } + + // Apply the effective mode + return applyMode(effectiveMode, serverSupportsPlanning, tableIdentifier, modeSource); + } + + /** + * Negotiate between client and server modes when both are configured. + * + *

Rules: + * + *

    + *
  • Both same = Use that mode + *
  • Both ONLY but want opposite planning = FAIL (incompatible) + *
  • One ONLY = ONLY wins (inflexible beats flexible) + *
  • Both PREFERRED = Client config wins + *
+ * + * @return the negotiated mode + * @throws IllegalStateException if both are ONLY but want opposite planning locations + */ + private static ScanPlanningMode negotiateBetweenClientAndServer( + ScanPlanningMode clientMode, ScanPlanningMode serverMode, TableIdentifier tableIdentifier) { + + // Fast path: both are the same - no negotiation needed + if (clientMode == serverMode) { + LOG.debug( + "Client and server agree on scan planning mode {} for table {}", + clientMode.modeName(), + tableIdentifier); + return clientMode; + } + + // Check for incompatible hard requirements: both are ONLY but want opposite planning locations + if (clientMode.isOnly() && serverMode.isOnly()) { + // Both have hard requirements but want different things + throw new IllegalStateException( + String.format( + "Incompatible scan planning requirements for table %s: " + + "client requires %s but server requires %s. " + + "Either change client config or update server mode.", + tableIdentifier, + clientMode.prefersClient() ? "client-side planning" : "server-side planning", + serverMode.prefersClient() ? "client-side planning" : "server-side planning")); + } + + // ONLY wins over PREFERRED (inflexible beats flexible) + if (clientMode.isOnly()) { + LOG.debug( + "Client mode {} (hard requirement) wins over server mode {} (flexible) for table {}", + clientMode.modeName(), + serverMode.modeName(), + tableIdentifier); + return clientMode; + } + + if (serverMode.isOnly()) { + LOG.debug( + "Server mode {} (hard requirement) wins over client mode {} (flexible) for table {}", + serverMode.modeName(), + clientMode.modeName(), + tableIdentifier); + return serverMode; + } + + // Both are PREFERRED - client config wins + LOG.debug( + "Both client ({}) and server ({}) are flexible (PREFERRED). Client config wins for table {}", + clientMode.modeName(), + serverMode.modeName(), + tableIdentifier); + return clientMode; + } + + /** + * Apply the effective mode and determine planning decision. + * + * @return the planning decision based on effective mode and server capabilities + * @throws UnsupportedOperationException if CATALOG_ONLY but server doesn't support planning + */ + private static PlanningDecision applyMode( + ScanPlanningMode effectiveMode, + boolean serverSupportsPlanning, + TableIdentifier tableIdentifier, + String modeSource) { + + switch (effectiveMode) { + case CLIENT_ONLY: + case CLIENT_PREFERRED: + LOG.debug( + "Using client-side planning for table {} (mode: {}, source: {})", + tableIdentifier, + effectiveMode.modeName(), + modeSource); + return PlanningDecision.USE_CLIENT_PLANNING; + + case CATALOG_PREFERRED: + // Prefer server-side, but fall back to client if unavailable + if (!serverSupportsPlanning) { + LOG.warn( + "Table {} prefers server-side planning (mode: CATALOG_PREFERRED from {}) " + + "but server doesn't support it. Falling back to client-side planning. " + + "Consider upgrading server or changing mode to CLIENT_PREFERRED.", + tableIdentifier, + modeSource); + return PlanningDecision.USE_CLIENT_PLANNING; + } + LOG.debug( + "Using server-side planning for table {} (mode: CATALOG_PREFERRED, source: {})", + tableIdentifier, + modeSource); + return PlanningDecision.USE_CATALOG_PLANNING; + + case CATALOG_ONLY: + // Must use server-side, fail if unavailable + if (!serverSupportsPlanning) { + throw new UnsupportedOperationException( + String.format( + "Scan planning mode requires server-side planning (CATALOG_ONLY from %s) " + + "for table %s but server does not support planning endpoint. " + + "Either change scan planning mode or upgrade server to support scan planning.", + modeSource, tableIdentifier)); + } + LOG.debug( + "Using server-side planning for table {} (mode: CATALOG_ONLY, source: {})", + tableIdentifier, + modeSource); + return PlanningDecision.USE_CATALOG_PLANNING; + + default: + throw new IllegalStateException("Unknown scan planning mode: " + effectiveMode); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index df4ba3214aea..4ac59532c89d 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -3528,4 +3528,40 @@ private static List allRequests(RESTCatalogAdapter adapter) { verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any()); return captor.getAllValues(); } + + @Test + public void scanPlanningModeFromString() { + // Null returns CLIENT_PREFERRED default + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString(null)) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CLIENT_PREFERRED); + + // Valid mode names + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("client-only")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CLIENT_ONLY); + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("client-preferred")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CLIENT_PREFERRED); + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("catalog-preferred")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CATALOG_PREFERRED); + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("catalog-only")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CATALOG_ONLY); + + // Case-insensitive parsing + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("CLIENT-ONLY")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CLIENT_ONLY); + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("Client-Preferred")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CLIENT_PREFERRED); + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("CATALOG-PREFERRED")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CATALOG_PREFERRED); + assertThat(RESTCatalogProperties.ScanPlanningMode.fromString("CaTaLoG-oNlY")) + .isEqualTo(RESTCatalogProperties.ScanPlanningMode.CATALOG_ONLY); + + // Invalid mode throws exception with all valid modes listed + assertThatThrownBy(() -> RESTCatalogProperties.ScanPlanningMode.fromString("invalid-mode")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid scan planning mode: invalid-mode") + .hasMessageContaining("client-only") + .hasMessageContaining("client-preferred") + .hasMessageContaining("catalog-preferred") + .hasMessageContaining("catalog-only"); + } } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index f84197b0f16e..fbc375766ed2 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -72,6 +72,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -123,7 +124,7 @@ public T execute( .collect(Collectors.toList())) .withOverrides( ImmutableMap.of( - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")) + RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog-preferred")) .build()); } Object body = roundTripSerialize(request.body(), "request"); @@ -922,8 +923,8 @@ public T execute( ImmutableMap.of( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO", - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - "true")); + RESTCatalogProperties.SCAN_PLANNING_MODE, + "catalog-preferred")); return new CatalogWithAdapter(catalog, adapter); } @@ -945,6 +946,75 @@ public void serverDoesNotSupportPlanningEndpoint() throws IOException { .isEqualTo(FILE_A.location()); } + @Test + public void scanPlanningModeRequired() throws IOException { + // Test REQUIRED mode - should throw exception when server doesn't support planning + // Create a catalog that doesn't advertise scan planning endpoints + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + // Return config without scan planning endpoints + return castResponse( + responseType, + ConfigResponse.builder().withEndpoints(baseCatalogEndpoints()).build()); + } + return super.execute(request, responseType, errorHandler, responseHeaders); + } + }); + + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + catalog.initialize( + "test-catalog-only", + ImmutableMap.of( + RESTCatalogProperties.SCAN_PLANNING_MODE, + "catalog-only", + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO")); + + TableIdentifier tableId = TableIdentifier.of(NS, "required_mode_test"); + catalog.createNamespace(NS); + + // Should throw UnsupportedOperationException when trying to create/load the table + // because CATALOG_ONLY mode requires server-side planning but server doesn't support it + assertThatThrownBy(() -> catalog.createTable(tableId, SCHEMA)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("server-side planning") + .hasMessageContaining("CATALOG_ONLY"); + + catalog.close(); + } + + @Test + public void scanPlanningModeNone() throws IOException { + // Test NONE mode - should use client-side planning even if server supports it + CatalogWithAdapter catalogWithAdapter = + catalogWithTableLevelConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, "client-only"); + + Table table = createTableWithScanPlanning(catalogWithAdapter.catalog, "none_mode_test"); + table.newAppend().appendFile(FILE_A).commit(); + + // Should not be a RESTTable since NONE mode disables server-side planning + assertThat(table).isNotInstanceOf(RESTTable.class); + + // Should still work with client-side planning + assertThat(table.newScan().planFiles()) + .hasSize(1) + .first() + .extracting(ContentScanTask::file) + .extracting(ContentFile::location) + .isEqualTo(FILE_A.location()); + + catalogWithAdapter.catalog.close(); + } + @Test public void serverSupportsPlanningSyncOnlyNotAsync() { // Server supports submit (sync) but not fetch (async polling) @@ -1020,4 +1090,83 @@ public void serverSupportsPlanningButNotCancellation() throws IOException { // Verify no exception was thrown - cancelPlan returns false when endpoint not supported assertThat(cancelled).isFalse(); } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + public void tableLevelScanPlanningOverride( + Function planMode) + throws IOException { + // Test SCAN_PLANNING_MODE in LoadTableResponse.config() overrides catalog setting + configurePlanningBehavior(planMode); + + // Catalog that adds scan planning config to LoadTableResponse (table-level override) + CatalogWithAdapter catalogWithAdapter = + catalogWithTableLevelConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog-preferred"); + + RESTTable table = restTableFor(catalogWithAdapter.catalog, "table_override_test"); + setParserContext(table); + + assertThat(table.newScan().planFiles()).hasSize(1); + + catalogWithAdapter.catalog.close(); + } + + private CatalogWithAdapter catalogWithTableLevelConfig(String configKey, String configValue) { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse( + responseType, + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .withOverride(RESTCatalogProperties.SCAN_PLANNING_MODE, "client-only") + .build()); + } + + Object body = roundTripSerialize(request.body(), "request"); + T response = + super.execute( + ImmutableHTTPRequest.builder().from(request).body(body).build(), + responseType, + errorHandler, + responseHeaders); + + // Add config to ALL LoadTableResponse + if (response instanceof LoadTableResponse) { + LoadTableResponse load = (LoadTableResponse) response; + return roundTripSerialize( + castResponse( + responseType, + LoadTableResponse.builder() + .withTableMetadata(load.tableMetadata()) + .addConfig(configKey, configValue) + .addAllCredentials(load.credentials()) + .build()), + "response"); + } + + return roundTripSerialize(response, "response"); + } + }); + + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + catalog.initialize( + "test", + ImmutableMap.of( + // Don't set SCAN_PLANNING_MODE here - let server config control it + // (Setting it here would be client configuration which always takes precedence) + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + return new CatalogWithAdapter(catalog, adapter); + } } diff --git a/open-api/rest-catalog-open-api.py b/open-api/rest-catalog-open-api.py index 1079d277d3c7..6b22010607a4 100644 --- a/open-api/rest-catalog-open-api.py +++ b/open-api/rest-catalog-open-api.py @@ -1291,6 +1291,29 @@ class LoadTableResult(BaseModel): ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Controls scan planning behavior for table operations. This property can be configured by: + - **Server**: Returned in `LoadTableResponse.config()` to advertise server preference/requirement + - **Client**: Set in catalog properties to override server configuration + + **Configuration Precedence**: Client config > Server config > Default (`client-preferred`) + + **Valid values**: + - `client-only`: MUST use client-side planning. Fails if paired with server's `catalog-only`. + - `client-preferred` (default): Prefer client-side planning but flexible. + - `catalog-preferred`: Prefer server-side planning but flexible. Falls back to client if server doesn't support planning endpoints. + - `catalog-only`: MUST use server-side planning. Requires server support. Fails if paired with client's `client-only`. + + ### Scan Planning Negotiation + + When both client and server provide `scan-planning-mode` configuration, the final planning decision is negotiated based on the following rules: + + **Negotiation Rules:** + - **Incompatible requirements**: `client-only` + `catalog-only` = **FAIL** + - **ONLY beats PREFERRED**: When one side has "ONLY" and the other has "PREFERRED", the ONLY requirement wins (inflexible beats flexible) + - **Both PREFERRED**: When both are PREFERRED (different types), client config wins + - **Both same**: When both have the same value, use that planning type + - **Only one configured**: Use the configured side (client or server) + - **Neither configured**: Use default (`client-preferred`) ## AWS Configurations diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml index d322b0c7c7c0..05666f391df1 100644 --- a/open-api/rest-catalog-open-api.yaml +++ b/open-api/rest-catalog-open-api.yaml @@ -3374,6 +3374,29 @@ components: ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Controls scan planning behavior for table operations. This property can be configured by: + - **Server**: Returned in `LoadTableResponse.config()` to advertise server preference/requirement + - **Client**: Set in catalog properties to override server configuration + + **Configuration Precedence**: Client config > Server config > Default (`client-preferred`) + + **Valid values**: + - `client-only`: MUST use client-side planning. Fails if paired with server's `catalog-only`. + - `client-preferred` (default): Prefer client-side planning but flexible. + - `catalog-preferred`: Prefer server-side planning but flexible. Falls back to client if server doesn't support planning endpoints. + - `catalog-only`: MUST use server-side planning. Requires server support. Fails if paired with client's `client-only`. + + ### Scan Planning Negotiation + + When both client and server provide `scan-planning-mode` configuration, the final planning decision is negotiated based on the following rules: + + **Negotiation Rules:** + - **Incompatible requirements**: `client-only` + `catalog-only` = **FAIL** + - **ONLY beats PREFERRED**: When one side has "ONLY" and the other has "PREFERRED", the ONLY requirement wins (inflexible beats flexible) + - **Both PREFERRED**: When both are PREFERRED (different types), client config wins + - **Both same**: When both have the same value, use that planning type + - **Only one configured**: Use the configured side (client or server) + - **Neither configured**: Use default (`client-preferred`) ## AWS Configurations diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 14e6c358898c..83c8a2441c2b 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -42,7 +42,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.CATALOG_ONLY.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 14e6c358898c..83c8a2441c2b 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -42,7 +42,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.CATALOG_ONLY.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 14e6c358898c..83c8a2441c2b 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -42,7 +42,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.CATALOG_ONLY.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 14e6c358898c..83c8a2441c2b 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -42,7 +42,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.CATALOG_ONLY.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" }