diff --git a/core/src/test/java/org/apache/iceberg/rest/RequestMatcher.java b/core/src/test/java/org/apache/iceberg/rest/RequestMatcher.java new file mode 100644 index 000000000000..18b66e773738 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/RequestMatcher.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.mockito.ArgumentMatchers.argThat; + +import java.util.Map; +import java.util.Objects; + +class RequestMatcher { + private RequestMatcher() {} + + public static HTTPRequest matches(HTTPRequest.HTTPMethod method) { + return argThat(req -> req.method() == method); + } + + static HTTPRequest matches(HTTPRequest.HTTPMethod method, String path) { + return argThat(req -> req.method() == method && req.path().equals(path)); + } + + public static HTTPRequest matches( + HTTPRequest.HTTPMethod method, String path, Map headers) { + return argThat( + req -> + req.method() == method + && req.path().equals(path) + && req.headers().equals(HTTPHeaders.of(headers))); + } + + public static HTTPRequest matches( + HTTPRequest.HTTPMethod method, + String path, + Map headers, + Map parameters) { + return argThat( + req -> + req.method() == method + && req.path().equals(path) + && req.headers().equals(HTTPHeaders.of(headers)) + && req.queryParameters().equals(parameters)); + } + + public static HTTPRequest matches( + HTTPRequest.HTTPMethod method, + String path, + Map headers, + Map parameters, + Object body) { + return argThat( + req -> + req.method() == method + && req.path().equals(path) + && req.headers().equals(HTTPHeaders.of(headers)) + && req.queryParameters().equals(parameters) + && Objects.equals(req.body(), body)); + } + + public static HTTPRequest containsHeaders( + HTTPRequest.HTTPMethod method, String path, Map headers) { + return argThat( + req -> + req.method() == method + && req.path().equals(path) + && req.headers().entries().containsAll(HTTPHeaders.of(headers).entries())); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestBaseWithRESTServer.java b/core/src/test/java/org/apache/iceberg/rest/TestBaseWithRESTServer.java new file mode 100644 index 000000000000..f1a172a4237c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestBaseWithRESTServer.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; + +public abstract class TestBaseWithRESTServer { + protected static final ObjectMapper MAPPER = RESTObjectMapper.mapper(); + protected static final Namespace NS = Namespace.of("ns"); + protected static final SessionCatalog.SessionContext DEFAULT_SESSION_CONTEXT = + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()); + + protected InMemoryCatalog backendCatalog; + protected RESTCatalogAdapter adapterForRESTServer; + protected Server httpServer; + protected RESTCatalog restCatalog; + protected ParserContext parserContext; + + @TempDir private Path temp; + + @BeforeEach + public void before() throws Exception { + File warehouse = temp.toFile(); + this.backendCatalog = new InMemoryCatalog(); + this.backendCatalog.initialize( + "in-memory", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); + + adapterForRESTServer = createAdapterForServer(); + + ServletContextHandler servletContext = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContext.addServlet( + new ServletHolder(new RESTCatalogServlet(adapterForRESTServer)), "/*"); + servletContext.setHandler(new GzipHandler()); + + this.httpServer = new Server(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); + httpServer.setHandler(servletContext); + httpServer.start(); + + restCatalog = initCatalog(catalogName(), ImmutableMap.of()); + } + + @AfterEach + public void after() throws Exception { + if (restCatalog != null) { + restCatalog.close(); + } + + if (backendCatalog != null) { + backendCatalog.close(); + } + + if (httpServer != null) { + httpServer.stop(); + httpServer.join(); + } + } + + protected RESTCatalogAdapter createAdapterForServer() { + return Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + Object body = roundTripSerialize(request.body(), "request"); + HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); + T response = super.execute(req, responseType, errorHandler, responseHeaders); + return roundTripSerialize(response, "response"); + } + }); + } + + protected abstract String catalogName(); + + @SuppressWarnings("unchecked") + protected T roundTripSerialize(T payload, String description) { + if (payload != null) { + try { + if (payload instanceof RESTMessage) { + return (T) MAPPER.readValue(MAPPER.writeValueAsString(payload), payload.getClass()); + } else { + // use Map so that Jackson doesn't try to instantiate ImmutableMap from payload.getClass() + return (T) MAPPER.readValue(MAPPER.writeValueAsString(payload), Map.class); + } + } catch (JsonProcessingException e) { + throw new RuntimeException( + String.format("Failed to serialize and deserialize %s: %s", description, payload), e); + } + } + return null; + } + + private RESTCatalog initCatalog(String catalogName, Map additionalProperties) { + RESTCatalog catalog = + new RESTCatalog( + DEFAULT_SESSION_CONTEXT, + (config) -> + HTTPClient.builder(config) + .uri(config.get(CatalogProperties.URI)) + .withHeaders(RESTUtil.configHeaders(config)) + .build()); + catalog.setConf(new Configuration()); + Map properties = + ImmutableMap.of( + CatalogProperties.URI, + httpServer.getURI().toString(), + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO"); + catalog.initialize( + catalogName, + ImmutableMap.builder() + .putAll(properties) + .putAll(additionalProperties) + .build()); + + return catalog; + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestFreshnessAwareLoading.java b/core/src/test/java/org/apache/iceberg/rest/TestFreshnessAwareLoading.java new file mode 100644 index 000000000000..75567e95d001 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestFreshnessAwareLoading.java @@ -0,0 +1,815 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.rest.RequestMatcher.matches; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.github.benmanes.caffeine.cache.Cache; +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.http.HttpHeaders; +import org.apache.iceberg.BaseMetadataTable; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.FakeTicker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +public class TestFreshnessAwareLoading extends TestBaseWithRESTServer { + private static final ResourcePaths RESOURCE_PATHS = + ResourcePaths.forCatalogProperties( + ImmutableMap.of( + RESTCatalogProperties.NAMESPACE_SEPARATOR, + RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8)); + private static final TableIdentifier TABLE = TableIdentifier.of(NS, "newtable"); + private static final Duration TABLE_EXPIRATION = + Duration.ofMillis(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT); + private static final Duration HALF_OF_TABLE_EXPIRATION = TABLE_EXPIRATION.dividedBy(2); + + @Override + protected String catalogName() { + return "catalog-freshness-aware-loading"; + } + + @Test + public void eTagWithCreateAndLoadTable() { + Map respHeaders = Maps.newHashMap(); + RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); + + catalog.createNamespace(TABLE.namespace()); + catalog.createTable(TABLE, SCHEMA); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + String eTag = respHeaders.get(HttpHeaders.ETAG); + respHeaders.clear(); + + catalog.loadTable(TABLE); + + assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag); + } + + @Test + public void eTagWithDifferentTables() { + Map respHeaders = Maps.newHashMap(); + RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); + + catalog.createNamespace(TABLE.namespace()); + catalog.createTable(TABLE, SCHEMA); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + String eTagTbl1 = respHeaders.get(HttpHeaders.ETAG); + respHeaders.clear(); + + catalog.createTable(TableIdentifier.of(TABLE.namespace(), "table2"), SCHEMA); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + assertThat(eTagTbl1).isNotEqualTo(respHeaders.get(HttpHeaders.ETAG)); + } + + @Test + public void eTagAfterDataUpdate() { + Map respHeaders = Maps.newHashMap(); + RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); + + catalog.createNamespace(TABLE.namespace()); + Table tbl = catalog.createTable(TABLE, SCHEMA); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + String eTag = respHeaders.get(HttpHeaders.ETAG); + + respHeaders.clear(); + tbl.newAppend().appendFile(FILE_A).commit(); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + assertThat(eTag).isNotEqualTo(respHeaders.get(HttpHeaders.ETAG)); + } + + @Test + public void eTagAfterMetadataOnlyUpdate() { + Map respHeaders = Maps.newHashMap(); + RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); + + catalog.createNamespace(TABLE.namespace()); + Table tbl = catalog.createTable(TABLE, SCHEMA); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + String eTag = respHeaders.get(HttpHeaders.ETAG); + + respHeaders.clear(); + tbl.updateSchema().addColumn("extra", Types.IntegerType.get()).commit(); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + assertThat(eTag).isNotEqualTo(respHeaders.get(HttpHeaders.ETAG)); + } + + @Test + public void eTagWithRegisterTable() { + Map respHeaders = Maps.newHashMap(); + RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); + + catalog.createNamespace(TABLE.namespace()); + Table tbl = catalog.createTable(TABLE, SCHEMA); + + assertThat(respHeaders).containsKey(HttpHeaders.ETAG); + String eTag = respHeaders.get(HttpHeaders.ETAG); + + respHeaders.clear(); + catalog.registerTable( + TableIdentifier.of(TABLE.namespace(), "other_table"), + ((BaseTable) tbl).operations().current().metadataFileLocation()); + + assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag); + } + + @Test + public void notModifiedResponse() { + restCatalog.createNamespace(TABLE.namespace()); + restCatalog.createTable(TABLE, SCHEMA); + Table table = restCatalog.loadTable(TABLE); + + String eTag = + ETagProvider.of(((BaseTable) table).operations().current().metadataFileLocation()); + + Mockito.doAnswer( + invocation -> { + HTTPRequest originalRequest = invocation.getArgument(0); + + assertThat(originalRequest.headers().contains(HttpHeaders.IF_NONE_MATCH)); + assertThat( + originalRequest.headers().firstEntry(HttpHeaders.IF_NONE_MATCH).get().value()) + .isEqualTo(eTag); + + assertThat( + adapterForRESTServer.execute( + originalRequest, + LoadTableResponse.class, + invocation.getArgument(2), + invocation.getArgument(3), + ParserContext.builder().build())) + .isNull(); + + return null; + }) + .when(adapterForRESTServer) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), + eq(LoadTableResponse.class), + any(), + any()); + + assertThat(restCatalog.loadTable(TABLE)).isNotNull(); + + TableIdentifier metadataTableIdentifier = + TableIdentifier.of(NS.toString(), TABLE.name(), "partitions"); + + assertThat(restCatalog.loadTable(metadataTableIdentifier)).isNotNull(); + + Mockito.verify(adapterForRESTServer, times(3)) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), + eq(LoadTableResponse.class), + any(), + any()); + + verify(adapterForRESTServer) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(metadataTableIdentifier)), + any(), + any(), + any()); + } + + @Test + public void freshnessAwareLoading() { + restCatalog.createNamespace(TABLE.namespace()); + restCatalog.createTable(TABLE, SCHEMA); + + Cache tableCache = + restCatalog.sessionCatalog().tableCache().cache(); + assertThat(tableCache.estimatedSize()).isZero(); + + expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer); + BaseTable tableAfterFirstLoad = (BaseTable) restCatalog.loadTable(TABLE); + + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer); + BaseTable tableAfterSecondLoad = (BaseTable) restCatalog.loadTable(TABLE); + + assertThat(tableAfterFirstLoad).isNotSameAs(tableAfterSecondLoad); + assertThat(tableAfterFirstLoad.operations().current().location()) + .isEqualTo(tableAfterSecondLoad.operations().current().location()); + assertThat( + tableCache + .asMap() + .get(RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)) + .supplier() + .get() + .operations() + .current() + .metadataFileLocation()) + .isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation()); + + Mockito.verify(adapterForRESTServer, times(2)) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); + } + + @Test + public void freshnessAwareLoadingMetadataTables() { + restCatalog.createNamespace(TABLE.namespace()); + restCatalog.createTable(TABLE, SCHEMA); + + Cache tableCache = + restCatalog.sessionCatalog().tableCache().cache(); + assertThat(tableCache.estimatedSize()).isZero(); + + BaseTable table = (BaseTable) restCatalog.loadTable(TABLE); + + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + TableIdentifier metadataTableIdentifier = + TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), "partitions"); + + BaseMetadataTable metadataTable = + (BaseMetadataTable) restCatalog.loadTable(metadataTableIdentifier); + + assertThat(tableCache.stats().hitCount()).isEqualTo(1); + assertThat(tableCache.asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThat(table).isNotSameAs(metadataTable.table()); + assertThat(table.operations().current().metadataFileLocation()) + .isEqualTo(metadataTable.table().operations().current().metadataFileLocation()); + + ResourcePaths paths = + ResourcePaths.forCatalogProperties( + ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E")); + + Mockito.verify(adapterForRESTServer, times(2)) + .execute(matches(HTTPRequest.HTTPMethod.GET, paths.table(TABLE)), any(), any(), any()); + + Mockito.verify(adapterForRESTServer) + .execute( + matches(HTTPRequest.HTTPMethod.GET, paths.table(metadataTableIdentifier)), + any(), + any(), + any()); + } + + @Test + public void renameTableInvalidatesTable() { + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + catalog -> catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), "other_table")), + 0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void dropTableInvalidatesTable(boolean purge) { + runTableInvalidationTest( + restCatalog, adapterForRESTServer, catalog -> catalog.dropTable(TABLE, purge), 0); + } + + @Test + public void tableExistViaHeadRequestInvalidatesTable() { + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + (catalog -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + catalog.tableExists(TABLE); + }), + 0); + } + + @Test + public void tableExistViaGetRequestInvalidatesTable() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + // Configure REST server to answer tableExists query via GET + Mockito.doAnswer( + invocation -> + ConfigResponse.builder() + .withEndpoints( + ImmutableList.of( + Endpoint.V1_LOAD_TABLE, + Endpoint.V1_CREATE_NAMESPACE, + Endpoint.V1_CREATE_TABLE)) + .build()) + .when(adapter) + .execute( + matches(HTTPRequest.HTTPMethod.GET, ResourcePaths.config()), + eq(ConfigResponse.class), + any(), + any()); + + RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); + catalog.initialize( + "catalog", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + runTableInvalidationTest( + catalog, + adapter, + cat -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(cat.sessionCatalog().tableCache().cache().asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + cat.tableExists(TABLE); + }, + 1); + } + + @Test + public void loadTableInvalidatesCache() { + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + catalog -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThatThrownBy(() -> catalog.loadTable(TABLE)) + .isInstanceOf(NoSuchTableException.class) + .hasMessage("Table does not exist: %s", TABLE); + }, + 1); + } + + @Test + public void loadTableWithMetadataTableNameInvalidatesCache() { + TableIdentifier metadataTableIdentifier = + TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), "partitions"); + + runTableInvalidationTest( + restCatalog, + adapterForRESTServer, + catalog -> { + // Use a different catalog to drop the table + catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); + + // The main catalog still has the table in cache + assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier)) + .isInstanceOf(NoSuchTableException.class) + .hasMessage("Table does not exist: %s", TABLE); + }, + 1); + + ResourcePaths paths = + ResourcePaths.forCatalogProperties( + ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E")); + + Mockito.verify(adapterForRESTServer) + .execute( + matches(HTTPRequest.HTTPMethod.GET, paths.table(metadataTableIdentifier)), + any(), + any(), + any()); + } + + private void runTableInvalidationTest( + RESTCatalog catalog, + RESTCatalogAdapter adapterToVerify, + Consumer action, + int loadTableCountFromAction) { + catalog.createNamespace(TABLE.namespace()); + catalog.createTable(TABLE, SCHEMA); + BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE); + + Cache tableCache = + catalog.sessionCatalog().tableCache().cache(); + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + action.accept(catalog); + + // Check that 'action' invalidates cache + assertThat(tableCache.estimatedSize()).isZero(); + + assertThatThrownBy(() -> catalog.loadTable(TABLE)) + .isInstanceOf(NoSuchTableException.class) + .hasMessageContaining("Table does not exist: %s", TABLE); + + catalog.createTable(TABLE, SCHEMA); + expectFullTableLoadForLoadTable(TABLE, adapterToVerify); + BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE); + + assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction); + assertThat(tableCache.asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + assertThat(newTableWithSameName).isNotEqualTo(originalTable); + assertThat(newTableWithSameName.operations().current().metadataFileLocation()) + .isNotEqualTo(originalTable.operations().current().metadataFileLocation()); + + Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction)) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); + } + + @Test + public void tableCacheWithMultiSessions() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config -> adapter, null); + sessionCatalog.initialize("test_session_catalog", Map.of()); + + SessionCatalog.SessionContext otherSessionContext = + new SessionCatalog.SessionContext( + "session_id_2", "user", ImmutableMap.of("credential", "user:12345"), ImmutableMap.of()); + + sessionCatalog.createNamespace(DEFAULT_SESSION_CONTEXT, TABLE.namespace()); + sessionCatalog.buildTable(DEFAULT_SESSION_CONTEXT, TABLE, SCHEMA).create(); + expectFullTableLoadForLoadTable(TABLE, adapter); + BaseTable tableSession1 = (BaseTable) sessionCatalog.loadTable(DEFAULT_SESSION_CONTEXT, TABLE); + + Cache tableCache = + sessionCatalog.tableCache().cache(); + assertThat(tableCache.stats().hitCount()).isZero(); + assertThat(tableCache.asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); + + expectFullTableLoadForLoadTable(TABLE, adapter); + BaseTable tableSession2 = (BaseTable) sessionCatalog.loadTable(otherSessionContext, TABLE); + + assertThat(tableSession1).isNotEqualTo(tableSession2); + assertThat(tableSession1.operations().current().metadataFileLocation()) + .isEqualTo(tableSession2.operations().current().metadataFileLocation()); + assertThat(tableCache.asMap()) + .containsOnlyKeys( + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE), + RESTTableCache.SessionIdTableId.of(otherSessionContext.sessionId(), TABLE)); + } + + @Test + public void notModified304ResponseWithEmptyTableCache() { + Mockito.doAnswer(invocation -> null) + .when(adapterForRESTServer) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), + eq(LoadTableResponse.class), + any(), + any()); + + restCatalog.createNamespace(TABLE.namespace()); + restCatalog.createTable(TABLE, SCHEMA); + restCatalog.invalidateTable(TABLE); + + // Table is not in the cache and null LoadTableResponse is received + assertThatThrownBy(() -> restCatalog.loadTable(TABLE)) + .isInstanceOf(RESTException.class) + .hasMessage( + "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s", + HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)); + } + + @Test + public void tableCacheNotUpdatedWithoutETag() { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + // Wrap the original responseHeaders to not accept ETag. + Consumer> noETagConsumer = + headers -> { + if (!headers.containsKey(HttpHeaders.ETAG)) { + responseHeaders.accept(headers); + } + }; + return super.execute(request, responseType, errorHandler, noETagConsumer); + } + }); + + RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); + catalog.initialize( + "catalog", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + catalog.createNamespace(TABLE.namespace()); + catalog.createTable(TABLE, SCHEMA); + assertThat(catalog.loadTable(TABLE)).isNotNull(); + + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); + } + + @Test + public void tableCacheDisabled() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); + catalog.initialize( + "catalog", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO", + RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, + "0")); + + catalog.createNamespace(TABLE.namespace()); + catalog.createTable(TABLE, SCHEMA); + + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); + + expectFullTableLoadForLoadTable(TABLE, adapter); + assertThat(catalog.loadTable(TABLE)).isNotNull(); + catalog.sessionCatalog().tableCache().cache().cleanUp(); + + assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); + } + + @Test + public void fullTableLoadAfterExpiryFromCache() { + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + + FakeTicker ticker = new FakeTicker(); + + TestableRESTCatalog catalog = + new TestableRESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter, ticker); + catalog.initialize("catalog", Map.of()); + + catalog.createNamespace(TABLE.namespace()); + catalog.createTable(TABLE, SCHEMA); + catalog.loadTable(TABLE); + + Cache tableCache = + catalog.sessionCatalog().tableCache().cache(); + RESTTableCache.SessionIdTableId tableCacheKey = + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); + + assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(Duration.ZERO); + + ticker.advance(HALF_OF_TABLE_EXPIRATION); + + assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(HALF_OF_TABLE_EXPIRATION); + + ticker.advance(HALF_OF_TABLE_EXPIRATION.plus(Duration.ofSeconds(10))); + + assertThat(tableCache.asMap()).doesNotContainKey(tableCacheKey); + + expectFullTableLoadForLoadTable(TABLE, adapter); + assertThat(catalog.loadTable(TABLE)).isNotNull(); + + assertThat(tableCache.stats().hitCount()).isEqualTo(0); + assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(Duration.ZERO); + } + + @Test + public void tableCacheAgeNotRefreshedAfterAccess() { + FakeTicker ticker = new FakeTicker(); + + TestableRESTCatalog catalog = + new TestableRESTCatalog( + DEFAULT_SESSION_CONTEXT, config -> new RESTCatalogAdapter(backendCatalog), ticker); + catalog.initialize("catalog", Map.of()); + + catalog.createNamespace(TABLE.namespace()); + catalog.createTable(TABLE, SCHEMA); + catalog.loadTable(TABLE); + + ticker.advance(HALF_OF_TABLE_EXPIRATION); + + Cache tableCache = + catalog.sessionCatalog().tableCache().cache(); + RESTTableCache.SessionIdTableId tableCacheKey = + RESTTableCache.SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); + + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(HALF_OF_TABLE_EXPIRATION); + + assertThat(catalog.loadTable(TABLE)).isNotNull(); + + assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) + .isPresent() + .get() + .isEqualTo(HALF_OF_TABLE_EXPIRATION); + } + + @Test + public void customTableOperationsWithFreshnessAwareLoading() { + class CustomTableOps extends RESTTableOperations { + CustomTableOps( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO io, + TableMetadata current, + Set endpoints) { + super(client, path, readHeaders, mutationHeaders, io, current, endpoints); + } + } + + class CustomRESTSessionCatalog extends RESTSessionCatalog { + CustomRESTSessionCatalog( + Function, RESTClient> clientBuilder, + BiFunction, FileIO> ioBuilder) { + super(clientBuilder, ioBuilder); + } + + @Override + protected RESTTableOperations newTableOps( + RESTClient restClient, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO fileIO, + TableMetadata current, + Set supportedEndpoints) { + return new CustomTableOps( + restClient, path, readHeaders, mutationHeaders, fileIO, current, supportedEndpoints); + } + } + + RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); + RESTCatalog catalog = + catalog(adapter, clientBuilder -> new CustomRESTSessionCatalog(clientBuilder, null)); + + catalog.createNamespace(NS); + catalog.createTable(TABLE, SCHEMA); + + expectFullTableLoadForLoadTable(TABLE, adapter); + BaseTable table = (BaseTable) catalog.loadTable(TABLE); + assertThat(table.operations()).isInstanceOf(CustomTableOps.class); + + // When answering loadTable from table cache we still get the injected ops. + expectNotModifiedResponseForLoadTable(TABLE, adapter); + table = (BaseTable) catalog.loadTable(TABLE); + assertThat(table.operations()).isInstanceOf(CustomTableOps.class); + } + + private RESTCatalog catalogWithResponseHeaders(Map respHeaders) { + RESTCatalogAdapter adapter = + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + return super.execute(request, responseType, errorHandler, respHeaders::putAll); + } + }; + + return catalog(adapter); + } + + private RESTCatalog catalog(RESTCatalogAdapter adapter) { + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + catalog.initialize( + "test", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + return catalog; + } + + private RESTCatalog catalog( + RESTCatalogAdapter adapter, + Function, RESTClient>, RESTSessionCatalog> + sessionCatalogFactory) { + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter) { + @Override + protected RESTSessionCatalog newSessionCatalog( + Function, RESTClient> clientBuilder) { + return sessionCatalogFactory.apply(clientBuilder); + } + }; + catalog.initialize( + "test", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + return catalog; + } + + private void expectFullTableLoadForLoadTable(TableIdentifier ident, RESTCatalogAdapter adapter) { + Answer invocationAssertsFullLoad = + invocation -> + assertThat((LoadTableResponse) invocation.callRealMethod()).isNotEqualTo(null).actual(); + + Mockito.doAnswer(invocationAssertsFullLoad) + .when(adapter) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(ident)), + eq(LoadTableResponse.class), + any(), + any()); + } + + private void expectNotModifiedResponseForLoadTable( + TableIdentifier ident, RESTCatalogAdapter adapter) { + Answer invocationAssertsFullLoad = + invocation -> + assertThat((LoadTableResponse) invocation.callRealMethod()).isEqualTo(null).actual(); + + Mockito.doAnswer(invocationAssertsFullLoad) + .when(adapter) + .execute( + matches(HTTPRequest.HTTPMethod.GET, RESOURCE_PATHS.table(ident)), + eq(LoadTableResponse.class), + any(), + any()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 6b981c493da0..2d569ae8264b 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.rest; +import static org.apache.iceberg.rest.RequestMatcher.containsHeaders; +import static org.apache.iceberg.rest.RequestMatcher.matches; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -33,17 +35,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.benmanes.caffeine.cache.Cache; import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.file.Path; -import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -56,8 +55,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.http.HttpHeaders; -import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.CatalogProperties; @@ -82,7 +79,6 @@ import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.RESTException; @@ -94,11 +90,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; -import org.apache.iceberg.rest.RESTTableCache.SessionIdTableId; -import org.apache.iceberg.rest.RESTTableCache.TableWithETag; import org.apache.iceberg.rest.auth.AuthManager; import org.apache.iceberg.rest.auth.AuthManagers; import org.apache.iceberg.rest.auth.AuthSession; @@ -115,7 +108,6 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.FakeTicker; import org.apache.iceberg.util.Pair; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; @@ -140,16 +132,6 @@ public class TestRESTCatalog extends CatalogTests { ImmutableMap.of( RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8)); - private static final Duration TABLE_EXPIRATION = - Duration.ofMillis(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT); - private static final Duration HALF_OF_TABLE_EXPIRATION = TABLE_EXPIRATION.dividedBy(2); - - private static final SessionCatalog.SessionContext DEFAULT_SESSION_CONTEXT = - new SessionCatalog.SessionContext( - UUID.randomUUID().toString(), - "user", - ImmutableMap.of("credential", "user:12345"), - ImmutableMap.of()); private static final class IdempotentEnv { private final TableIdentifier ident; @@ -326,7 +308,11 @@ protected RESTCatalog initCatalog(String catalogName, Map additi RESTCatalog catalog = new RESTCatalog( - DEFAULT_SESSION_CONTEXT, + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()), (config) -> HTTPClient.builder(config) .uri(config.get(CatalogProperties.URI)) @@ -526,13 +512,13 @@ public void testCatalogBasicBearerToken() { // the bearer token should be used for all interactions Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), catalogHeaders), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), catalogHeaders), any(), any(), any()); @@ -556,21 +542,21 @@ public void testCatalogCredentialNoOauth2ServerUri() { // no token or credential for catalog token exchange Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, ResourcePaths.tokens(), emptyHeaders), + matches(HTTPMethod.POST, ResourcePaths.tokens(), emptyHeaders), eq(OAuthTokenResponse.class), any(), any()); // no token or credential for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); // use the catalog token for all interactions Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), catalogHeaders), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), catalogHeaders), any(), any(), any()); @@ -602,21 +588,21 @@ public void testCatalogCredential(String oauth2ServerUri) { // no token or credential for catalog token exchange Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), eq(OAuthTokenResponse.class), any(), any()); // no token or credential for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); // use the catalog token for all interactions Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), catalogHeaders), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), catalogHeaders), any(), any(), any()); @@ -654,21 +640,21 @@ public void testCatalogBearerTokenWithClientCredential(String oauth2ServerUri) { // use the bearer token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); // use the bearer token to fetch the context token Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), eq(OAuthTokenResponse.class), any(), any()); // use the context token for table existence check Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), contextHeaders), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), contextHeaders), any(), any(), any()); @@ -708,28 +694,28 @@ public void testCatalogCredentialWithClientCredential(String oauth2ServerUri) { // call client credentials with no initial auth Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), eq(OAuthTokenResponse.class), any(), any()); // use the client credential token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); // use the client credential to fetch the context token Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), eq(OAuthTokenResponse.class), any(), any()); // use the context token for table existence check Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), contextHeaders), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), contextHeaders), any(), any(), any()); @@ -771,28 +757,28 @@ public void testCatalogBearerTokenAndCredentialWithClientCredential(String oauth // use the bearer token for client credentials Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, initHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, initHeaders), eq(OAuthTokenResponse.class), any(), any()); // use the client credential token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); // use the client credential to fetch the context token Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), eq(OAuthTokenResponse.class), any(), any()); // use the context token for table existence check Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), contextHeaders), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), contextHeaders), any(), any(), any()); @@ -954,7 +940,7 @@ private void testClientAuth( Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -964,14 +950,14 @@ private void testClientAuth( if (!credentials.containsKey("token")) { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), eq(OAuthTokenResponse.class), any(), any()); } Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), expectedHeaders), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), expectedHeaders), any(), any(), any()); @@ -1081,7 +1067,7 @@ public void testTableSnapshotLoading() { // verify that the table was loaded with the refs argument verify(adapter, times(1)) .execute( - reqMatcher( + matches( HTTPMethod.GET, RESOURCE_PATHS.table(TABLE), Map.of(), Map.of("snapshots", "refs")), eq(LoadTableResponse.class), any(), @@ -1091,7 +1077,7 @@ public void testTableSnapshotLoading() { assertThat(refsTable.snapshots()).containsExactlyInAnyOrderElementsOf(table.snapshots()); verify(adapter, times(1)) .execute( - reqMatcher( + matches( HTTPMethod.GET, RESOURCE_PATHS.table(TABLE), Map.of(), Map.of("snapshots", "all")), eq(LoadTableResponse.class), any(), @@ -1175,7 +1161,7 @@ public void testTableSnapshotLoadingWithDivergedBranches(String formatVersion) { // verify that the table was loaded with the refs argument verify(adapter, times(1)) .execute( - reqMatcher( + matches( HTTPMethod.GET, RESOURCE_PATHS.table(TABLE), Map.of(), Map.of("snapshots", "refs")), eq(LoadTableResponse.class), any(), @@ -1186,7 +1172,7 @@ public void testTableSnapshotLoadingWithDivergedBranches(String formatVersion) { .containsExactlyInAnyOrderElementsOf(table.snapshots()); verify(adapter, times(1)) .execute( - reqMatcher( + matches( HTTPMethod.GET, RESOURCE_PATHS.table(TABLE), Map.of(), Map.of("snapshots", "all")), eq(LoadTableResponse.class), any(), @@ -1287,7 +1273,7 @@ public void testTableAuth( Mockito.doAnswer(addTableConfig) .when(adapter) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.tables(namespace), expectedContextHeaders), + matches(HTTPMethod.POST, RESOURCE_PATHS.tables(namespace), expectedContextHeaders), eq(LoadTableResponse.class), any(), any()); @@ -1295,7 +1281,7 @@ public void testTableAuth( Mockito.doAnswer(addTableConfig) .when(adapter) .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedContextHeaders), + matches(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedContextHeaders), eq(LoadTableResponse.class), any(), any()); @@ -1338,14 +1324,14 @@ public void testTableAuth( Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); // session client credentials flow Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, catalogHeaders), eq(OAuthTokenResponse.class), any(), any()); @@ -1353,7 +1339,7 @@ public void testTableAuth( // create table request Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.tables(namespace), expectedContextHeaders), + matches(HTTPMethod.POST, RESOURCE_PATHS.tables(namespace), expectedContextHeaders), eq(LoadTableResponse.class), any(), any()); @@ -1363,7 +1349,7 @@ public void testTableAuth( // token exchange to get a table token Mockito.verify(adapter, times(1)) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, expectedContextHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, expectedContextHeaders), eq(OAuthTokenResponse.class), any(), any()); @@ -1373,7 +1359,7 @@ public void testTableAuth( // load table from catalog + refresh loaded table Mockito.verify(adapter, times(2)) .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedTableHeaders), + matches(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedTableHeaders), eq(LoadTableResponse.class), any(), any()); @@ -1381,7 +1367,7 @@ public void testTableAuth( // load table from catalog Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedContextHeaders), + matches(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedContextHeaders), eq(LoadTableResponse.class), any(), any()); @@ -1389,7 +1375,7 @@ public void testTableAuth( // refresh loaded table Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedTableHeaders), + matches(HTTPMethod.GET, RESOURCE_PATHS.table(TBL), expectedTableHeaders), eq(LoadTableResponse.class), any(), any()); @@ -1444,7 +1430,7 @@ public void testCatalogTokenRefresh(String oauth2ServerUri) { // call client credentials with no initial auth Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), eq(OAuthTokenResponse.class), any(), any()); @@ -1452,7 +1438,7 @@ public void testCatalogTokenRefresh(String oauth2ServerUri) { // use the client credential token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -1466,7 +1452,7 @@ public void testCatalogTokenRefresh(String oauth2ServerUri) { "scope", "catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, catalogHeaders, @@ -1490,7 +1476,7 @@ public void testCatalogTokenRefresh(String oauth2ServerUri) { "Bearer token-exchange-token:sub=client-credentials-token:sub=catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, secondRefreshHeaders, @@ -1559,7 +1545,7 @@ public void testCatalogRefreshedTokenIsUsed(String oauth2ServerUri) { "scope", "catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, emptyHeaders, @@ -1572,7 +1558,7 @@ public void testCatalogRefreshedTokenIsUsed(String oauth2ServerUri) { // use the client credential token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -1586,7 +1572,7 @@ public void testCatalogRefreshedTokenIsUsed(String oauth2ServerUri) { "scope", "catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, catalogHeaders, @@ -1603,8 +1589,7 @@ public void testCatalogRefreshedTokenIsUsed(String oauth2ServerUri) { "Bearer token-exchange-token:sub=client-credentials-token:sub=catalog"); Mockito.verify(adapter) .execute( - reqMatcher( - HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), refreshedCatalogHeader), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), refreshedCatalogHeader), any(), any(), any()); @@ -1668,7 +1653,7 @@ public void testCatalogTokenRefreshExchangeDisabled(String oauth2ServerUri) { // call client credentials with no initial auth Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), + matches(HTTPMethod.POST, oauth2ServerUri, emptyHeaders), eq(OAuthTokenResponse.class), any(), any()); @@ -1676,7 +1661,7 @@ public void testCatalogTokenRefreshExchangeDisabled(String oauth2ServerUri) { // use the client credential token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, "v1/config", catalogHeaders), + matches(HTTPMethod.GET, "v1/config", catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -1684,7 +1669,7 @@ public void testCatalogTokenRefreshExchangeDisabled(String oauth2ServerUri) { // verify the new token request is issued Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, emptyHeaders, Map.of(), refreshRequest), eq(OAuthTokenResponse.class), any(), @@ -1751,7 +1736,7 @@ public void testCatalogExpiredBearerTokenIsRefreshedWithCredential(String oauth2 "scope", "catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, emptyHeaders, Map.of(), clientCredentialsRequest), eq(OAuthTokenResponse.class), any(), @@ -1759,7 +1744,7 @@ public void testCatalogExpiredBearerTokenIsRefreshedWithCredential(String oauth2 Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -1772,7 +1757,7 @@ public void testCatalogExpiredBearerTokenIsRefreshedWithCredential(String oauth2 "scope", "catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, OAuth2Util.basicAuthHeaders(credential), @@ -1791,7 +1776,7 @@ public void testCatalogExpiredBearerTokenIsRefreshedWithCredential(String oauth2 "scope", "catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, OAuth2Util.basicAuthHeaders(credential), @@ -1803,7 +1788,7 @@ public void testCatalogExpiredBearerTokenIsRefreshedWithCredential(String oauth2 Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), Map.of("Authorization", "Bearer token-exchange-token:sub=" + token)), @@ -1852,7 +1837,7 @@ public void testCatalogExpiredTokenCredentialRefreshWithExchangeDisabled(String Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, emptyHeaders, Map.of(), clientCredentialsRequest), eq(OAuthTokenResponse.class), any(), @@ -1860,14 +1845,14 @@ public void testCatalogExpiredTokenCredentialRefreshWithExchangeDisabled(String Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, "v1/config", catalogHeaders), + matches(HTTPMethod.GET, "v1/config", catalogHeaders), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, emptyHeaders, Map.of(), clientCredentialsRequest), eq(OAuthTokenResponse.class), any(), @@ -1897,14 +1882,14 @@ public void testCatalogValidBearerTokenIsNotRefreshed() { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), OAuth2Util.authHeaders(token)), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), OAuth2Util.authHeaders(token)), any(), any(), any()); @@ -1990,7 +1975,7 @@ public void testCatalogTokenRefreshFailsAndUsesCredentialForRefresh(String oauth "scope", "catalog"); Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.POST, oauth2ServerUri, emptyHeaders, @@ -2003,7 +1988,7 @@ public void testCatalogTokenRefreshFailsAndUsesCredentialForRefresh(String oauth // use the client credential token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -2035,8 +2020,7 @@ public void testCatalogTokenRefreshFailsAndUsesCredentialForRefresh(String oauth "Bearer token-exchange-token:sub=client-credentials-token:sub=catalog"); Mockito.verify(adapter) .execute( - reqMatcher( - HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), refreshedCatalogHeader), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TBL), refreshedCatalogHeader), any(), any(), any()); @@ -2103,7 +2087,7 @@ public void testCatalogWithCustomTokenScope(String oauth2ServerUri) { // use the client credential token for config Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -2169,7 +2153,7 @@ public void testCatalogTokenRefreshDisabledWithToken(String oauth2ServerUri) { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -2221,7 +2205,7 @@ public void testCatalogTokenRefreshDisabledWithCredential(String oauth2ServerUri Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), + matches(HTTPMethod.GET, ResourcePaths.config(), catalogHeaders), eq(ConfigResponse.class), any(), any()); @@ -2393,14 +2377,14 @@ public void testPaginationForListNamespaces(int numberOfItems) { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), + matches(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter, times(numberOfItems)) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.namespaces(), Map.of(), Map.of()), + matches(HTTPMethod.POST, RESOURCE_PATHS.namespaces(), Map.of(), Map.of()), eq(CreateNamespaceResponse.class), any(), any()); @@ -2455,14 +2439,14 @@ public void testPaginationForListTables(int numberOfItems) { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), + matches(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter, times(numberOfItems)) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.tables(namespace), Map.of(), Map.of()), + matches(HTTPMethod.POST, RESOURCE_PATHS.tables(namespace), Map.of(), Map.of()), eq(LoadTableResponse.class), any(), any()); @@ -2515,7 +2499,7 @@ public void testCleanupUncommitedFilesForCleanableFailures() { Table table = catalog.loadTable(TABLE); Mockito.doThrow(new NotAuthorizedException("not authorized")) .when(adapter) - .execute(reqMatcher(HTTPMethod.POST), any(), any(), any()); + .execute(matches(HTTPMethod.POST), any(), any(), any()); assertThatThrownBy(() -> catalog.loadTable(TABLE).newFastAppend().appendFile(file).commit()) .isInstanceOf(NotAuthorizedException.class) .hasMessage("not authorized"); @@ -2552,7 +2536,7 @@ public void testNoCleanupForNonCleanableExceptions() { Mockito.doThrow(new ServiceFailureException("some service failure")) .when(adapter) - .execute(reqMatcher(HTTPMethod.POST), any(), any(), any()); + .execute(matches(HTTPMethod.POST), any(), any(), any()); assertThatThrownBy(() -> catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit()) .isInstanceOf(ServiceFailureException.class) .hasMessage("some service failure"); @@ -2586,7 +2570,7 @@ public void testCleanupCleanableExceptionsCreate() { TableIdentifier newTable = TableIdentifier.of(TABLE.namespace(), "some_table"); Mockito.doThrow(new NotAuthorizedException("not authorized")) .when(adapter) - .execute(reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(newTable)), any(), any(), any()); + .execute(matches(HTTPMethod.POST, RESOURCE_PATHS.table(newTable)), any(), any(), any()); Transaction createTableTransaction = catalog.newCreateTableTransaction(newTable, SCHEMA); createTableTransaction.newAppend().appendFile(FILE_A).commit(); @@ -2633,7 +2617,7 @@ public void testNoCleanupForNonCleanableCreateTransaction() { TableIdentifier newTable = TableIdentifier.of(TABLE.namespace(), "some_table"); Mockito.doThrow(new ServiceFailureException("some service failure")) .when(adapter) - .execute(reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(newTable)), any(), any(), any()); + .execute(matches(HTTPMethod.POST, RESOURCE_PATHS.table(newTable)), any(), any(), any()); Transaction createTableTransaction = catalog.newCreateTableTransaction(newTable, SCHEMA); createTableTransaction.newAppend().appendFile(FILE_A).commit(); @@ -2674,7 +2658,7 @@ public void testCleanupCleanableExceptionsReplace() { catalog.createTable(TABLE, SCHEMA); Mockito.doThrow(new NotAuthorizedException("not authorized")) .when(adapter) - .execute(reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); + .execute(matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); Transaction replaceTableTransaction = catalog.newReplaceTableTransaction(TABLE, SCHEMA, false); replaceTableTransaction.newAppend().appendFile(FILE_A).commit(); @@ -2717,7 +2701,7 @@ public void testNoCleanupForNonCleanableReplaceTransaction() { catalog.createTable(TABLE, SCHEMA); Mockito.doThrow(new ServiceFailureException("some service failure")) .when(adapter) - .execute(reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); + .execute(matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); Transaction replaceTableTransaction = catalog.newReplaceTableTransaction(TABLE, SCHEMA, false); replaceTableTransaction.newAppend().appendFile(FILE_A).commit(); @@ -2758,13 +2742,13 @@ public void testNamespaceExistsViaHEADRequest() { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), + matches(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.namespace(namespace), Map.of(), Map.of()), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.namespace(namespace), Map.of(), Map.of()), any(), any(), any()); @@ -2807,7 +2791,7 @@ public T execute( Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), + matches(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); @@ -2815,7 +2799,7 @@ public T execute( // verifies that the namespace is loaded via a GET instead of HEAD (V1_NAMESPACE_EXISTS) Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.namespace(namespace), Map.of(), Map.of()), + matches(HTTPMethod.GET, RESOURCE_PATHS.namespace(namespace), Map.of(), Map.of()), any(), any(), any()); @@ -2839,13 +2823,13 @@ public void testTableExistsViaHEADRequest() { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), + matches(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, RESOURCE_PATHS.table(TABLE), Map.of(), Map.of()), + matches(HTTPMethod.HEAD, RESOURCE_PATHS.table(TABLE), Map.of(), Map.of()), any(), any(), any()); @@ -2885,7 +2869,7 @@ public T execute( Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), + matches(HTTPMethod.GET, ResourcePaths.config(), Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); @@ -2893,7 +2877,7 @@ public T execute( // verifies that the table is loaded via a GET instead of HEAD (V1_LOAD_TABLE) Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.GET, RESOURCE_PATHS.table(TABLE), Map.of(), Map.of("snapshots", "all")), any(), any(), @@ -2907,179 +2891,6 @@ public void testTableExistsFallbackToGETRequestWithLegacyServer() { verifyTableExistsFallbackToGETRequest(ConfigResponse.builder().build()); } - @Test - public void testETagWithCreateAndLoadTable() { - Map respHeaders = Maps.newHashMap(); - - RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); - - if (requiresNamespaceCreate()) { - catalog.createNamespace(TABLE.namespace()); - } - - catalog.createTable(TABLE, SCHEMA); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - String eTag = respHeaders.get(HttpHeaders.ETAG); - respHeaders.clear(); - - catalog.loadTable(TABLE); - - assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag); - } - - @Test - public void testETagWithDifferentTables() { - Map respHeaders = Maps.newHashMap(); - - RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); - - if (requiresNamespaceCreate()) { - catalog.createNamespace(TABLE.namespace()); - } - - catalog.createTable(TABLE, SCHEMA); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - String eTagTbl1 = respHeaders.get(HttpHeaders.ETAG); - respHeaders.clear(); - - catalog.createTable(TableIdentifier.of(TABLE.namespace(), "table2"), SCHEMA); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - assertThat(eTagTbl1).isNotEqualTo(respHeaders.get(HttpHeaders.ETAG)); - } - - @Test - public void testETagAfterDataUpdate() { - Map respHeaders = Maps.newHashMap(); - - RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); - - if (requiresNamespaceCreate()) { - catalog.createNamespace(TABLE.namespace()); - } - - Table tbl = catalog.createTable(TABLE, SCHEMA); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - String eTag = respHeaders.get(HttpHeaders.ETAG); - - respHeaders.clear(); - - tbl.newAppend().appendFile(FILE_A).commit(); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - assertThat(eTag).isNotEqualTo(respHeaders.get(HttpHeaders.ETAG)); - } - - @Test - public void testETagAfterMetadataOnlyUpdate() { - Map respHeaders = Maps.newHashMap(); - - RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); - - if (requiresNamespaceCreate()) { - catalog.createNamespace(TABLE.namespace()); - } - - Table tbl = catalog.createTable(TABLE, SCHEMA); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - String eTag = respHeaders.get(HttpHeaders.ETAG); - - respHeaders.clear(); - - tbl.updateSchema().addColumn("extra", Types.IntegerType.get()).commit(); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - assertThat(eTag).isNotEqualTo(respHeaders.get(HttpHeaders.ETAG)); - } - - @Test - public void testETagWithRegisterTable() { - Map respHeaders = Maps.newHashMap(); - - RESTCatalog catalog = catalogWithResponseHeaders(respHeaders); - - if (requiresNamespaceCreate()) { - catalog.createNamespace(TABLE.namespace()); - } - - Table tbl = catalog.createTable(TABLE, SCHEMA); - - assertThat(respHeaders).containsKey(HttpHeaders.ETAG); - String eTag = respHeaders.get(HttpHeaders.ETAG); - - respHeaders.clear(); - - catalog.registerTable( - TableIdentifier.of(TABLE.namespace(), "other_table"), - ((BaseTable) tbl).operations().current().metadataFileLocation()); - - assertThat(respHeaders).containsEntry(HttpHeaders.ETAG, eTag); - } - - @Test - public void testNotModified() { - catalog().createNamespace(TABLE.namespace()); - - catalog().createTable(TABLE, SCHEMA); - - Table table = catalog().loadTable(TABLE); - - String eTag = - ETagProvider.of(((BaseTable) table).operations().current().metadataFileLocation()); - - Mockito.doAnswer( - invocation -> { - HTTPRequest originalRequest = invocation.getArgument(0); - - assertThat(originalRequest.headers().contains(HttpHeaders.IF_NONE_MATCH)); - assertThat( - originalRequest.headers().firstEntry(HttpHeaders.IF_NONE_MATCH).get().value()) - .isEqualTo(eTag); - - assertThat( - adapterForRESTServer.execute( - originalRequest, - LoadTableResponse.class, - invocation.getArgument(2), - invocation.getArgument(3), - ParserContext.builder().build())) - .isNull(); - - return null; - }) - .when(adapterForRESTServer) - .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), - eq(LoadTableResponse.class), - any(), - any()); - - catalog().loadTable(TABLE); - - TableIdentifier metadataTableIdentifier = - TableIdentifier.of(NS.toString(), TABLE.name(), "partitions"); - - catalog().loadTable(metadataTableIdentifier); - - Mockito.verify(adapterForRESTServer, times(3)) - .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), - eq(LoadTableResponse.class), - any(), - any()); - - verify(adapterForRESTServer) - .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(metadataTableIdentifier)), - any(), - any(), - any()); - } - @Test void testDifferentTableUUID() { RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); @@ -3105,7 +2916,7 @@ void testDifferentTableUUID() { Mockito.doAnswer(updateTable) .when(adapter) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), + matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), eq(LoadTableResponse.class), any(), any()); @@ -3122,22 +2933,6 @@ void testDifferentTableUUID() { .hasMessageMatching("Table UUID does not match: current=.* != refreshed=" + newUUID); } - private RESTCatalog catalogWithResponseHeaders(Map respHeaders) { - RESTCatalogAdapter adapter = - new RESTCatalogAdapter(backendCatalog) { - @Override - public T execute( - HTTPRequest request, - Class responseType, - Consumer errorHandler, - Consumer> responseHeaders) { - return super.execute(request, responseType, errorHandler, respHeaders::putAll); - } - }; - - return catalog(adapter); - } - @Test public void testReconcileOnUnknownSnapshotAddMatchesSnapshotId() { RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); @@ -3164,7 +2959,7 @@ public void testReconcileOnUnknownSnapshotAddMatchesSnapshotId() { }) .when(adapter) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), + matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), eq(LoadTableResponse.class), any(), any()); @@ -3220,7 +3015,7 @@ public void testCommitStateUnknownNotReconciled() { new CommitStateUnknownException(new ServiceFailureException("Service failed: 503"))) .when(adapter) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), + matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)), eq(LoadTableResponse.class), any(), any()); @@ -3351,7 +3146,7 @@ protected RESTTableOperations newTableOps( // Verify the custom operations were used with custom headers Mockito.verify(adapter, Mockito.atLeastOnce()) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE), customHeaders), + matches(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE), customHeaders), eq(LoadTableResponse.class), any(), any()); @@ -3375,67 +3170,13 @@ protected RESTTableOperations newTableOps( // Verify the custom operations were used with custom headers Mockito.verify(adapter, Mockito.atLeastOnce()) .execute( - reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(table2), customHeaders), + matches(HTTPMethod.POST, RESOURCE_PATHS.table(table2), customHeaders), eq(LoadTableResponse.class), any(), any()); } } - @Test - public void testCustomTableOperationsWithFreshnessAwareLoading() { - class CustomTableOps extends RESTTableOperations { - CustomTableOps( - RESTClient client, - String path, - Supplier> readHeaders, - Supplier> mutationHeaders, - FileIO io, - TableMetadata current, - Set endpoints) { - super(client, path, readHeaders, mutationHeaders, io, current, endpoints); - } - } - - class CustomRESTSessionCatalog extends RESTSessionCatalog { - CustomRESTSessionCatalog( - Function, RESTClient> clientBuilder, - BiFunction, FileIO> ioBuilder) { - super(clientBuilder, ioBuilder); - } - - @Override - protected RESTTableOperations newTableOps( - RESTClient restClient, - String path, - Supplier> readHeaders, - Supplier> mutationHeaders, - FileIO fileIO, - TableMetadata current, - Set supportedEndpoints) { - return new CustomTableOps( - restClient, path, readHeaders, mutationHeaders, fileIO, current, supportedEndpoints); - } - } - - RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); - RESTCatalog catalog = - catalog(adapter, clientBuilder -> new CustomRESTSessionCatalog(clientBuilder, null)); - - catalog.createNamespace(NS); - - catalog.createTable(TABLE, SCHEMA); - - expectFullTableLoadForLoadTable(TABLE, adapter); - BaseTable table = (BaseTable) catalog.loadTable(TABLE); - assertThat(table.operations()).isInstanceOf(CustomTableOps.class); - - // When answering loadTable from table cache we still get the injected ops. - expectNotModifiedResponseForLoadTable(TABLE, adapter); - table = (BaseTable) catalog.loadTable(TABLE); - assertThat(table.operations()).isInstanceOf(CustomTableOps.class); - } - @Test public void testClientAutoSendsIdempotencyWhenServerAdvertises() { ConfigResponse cfgWithIdem = @@ -3630,7 +3371,7 @@ public void nestedNamespaceWithLegacySeparator() { }) .when(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config()), + matches(HTTPMethod.GET, ResourcePaths.config()), eq(ConfigResponse.class), any(), any()); @@ -3662,7 +3403,7 @@ public void nestedNamespaceWithOverriddenSeparator() { }) .when(adapter) .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config()), + matches(HTTPMethod.GET, ResourcePaths.config()), eq(ConfigResponse.class), any(), any()); @@ -3691,7 +3432,7 @@ private void runConfigurableNamespaceSeparatorTest( // Verify the namespace separator in the path Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.POST, expectedPaths.tables(nestedNamespace)), + matches(HTTPMethod.POST, expectedPaths.tables(nestedNamespace)), eq(LoadTableResponse.class), any(), any()); @@ -3699,7 +3440,7 @@ private void runConfigurableNamespaceSeparatorTest( // Verify the namespace separator in query parameters Mockito.verify(adapter) .execute( - reqMatcher( + matches( HTTPMethod.GET, expectedPaths.namespaces(), Map.of(), @@ -3806,7 +3547,7 @@ private static CreateTableRequest createReq(TableIdentifier ident) { private void verifyCreatePost(Namespace ns, Map headers) { verify(adapterForRESTServer, atLeastOnce()) .execute( - reqMatcherContainsHeaders( + containsHeaders( HTTPMethod.POST, ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), headers), @@ -3836,496 +3577,6 @@ public void testLoadTableWithMissingMetadataFile(@TempDir Path tempDir) { .hasMessageContaining("No in-memory file found for location: " + metadataFileLocation); } - @Test - public void testInvalidTableCacheParameters() { - RESTCatalog catalog = new RESTCatalog(config -> new RESTCatalogAdapter(backendCatalog)); - - assertThatThrownBy( - () -> - catalog.initialize( - "test", Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0"))) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid expire after write: zero or negative"); - - assertThatThrownBy( - () -> - catalog.initialize( - "test", Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1"))) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid expire after write: zero or negative"); - - assertThatThrownBy( - () -> - catalog.initialize( - "test", Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1"))) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid max entries: negative"); - } - - @Test - public void testFreshnessAwareLoading() { - catalog().createNamespace(TABLE.namespace()); - - catalog().createTable(TABLE, SCHEMA); - - Cache tableCache = - restCatalog.sessionCatalog().tableCache().cache(); - assertThat(tableCache.estimatedSize()).isZero(); - - expectFullTableLoadForLoadTable(TABLE, adapterForRESTServer); - - BaseTable tableAfterFirstLoad = (BaseTable) catalog().loadTable(TABLE); - - assertThat(tableCache.stats().hitCount()).isZero(); - assertThat(tableCache.asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - expectNotModifiedResponseForLoadTable(TABLE, adapterForRESTServer); - - BaseTable tableAfterSecondLoad = (BaseTable) catalog().loadTable(TABLE); - - assertThat(tableAfterFirstLoad).isNotEqualTo(tableAfterSecondLoad); - assertThat(tableAfterFirstLoad.operations().current().location()) - .isEqualTo(tableAfterSecondLoad.operations().current().location()); - assertThat( - tableCache - .asMap() - .get(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)) - .supplier() - .get() - .operations() - .current() - .metadataFileLocation()) - .isEqualTo(tableAfterFirstLoad.operations().current().metadataFileLocation()); - - Mockito.verify(adapterForRESTServer, times(2)) - .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); - } - - @Test - public void testFreshnessAwareLoadingMetadataTables() { - catalog().createNamespace(TABLE.namespace()); - - catalog().createTable(TABLE, SCHEMA); - - Cache tableCache = - restCatalog.sessionCatalog().tableCache().cache(); - assertThat(tableCache.estimatedSize()).isZero(); - - BaseTable table = (BaseTable) catalog().loadTable(TABLE); - - assertThat(tableCache.stats().hitCount()).isZero(); - assertThat(tableCache.asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - TableIdentifier metadataTableIdentifier = - TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), "partitions"); - - BaseMetadataTable metadataTable = - (BaseMetadataTable) catalog().loadTable(metadataTableIdentifier); - - assertThat(tableCache.stats().hitCount()).isEqualTo(1); - assertThat(tableCache.asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - assertThat(table).isNotEqualTo(metadataTable.table()); - assertThat(table.operations().current().metadataFileLocation()) - .isEqualTo(metadataTable.table().operations().current().metadataFileLocation()); - - ResourcePaths paths = - ResourcePaths.forCatalogProperties( - ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E")); - - Mockito.verify(adapterForRESTServer, times(2)) - .execute(reqMatcher(HTTPMethod.GET, paths.table(TABLE)), any(), any(), any()); - - Mockito.verify(adapterForRESTServer) - .execute( - reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)), any(), any(), any()); - } - - @Test - public void testRenameTableInvalidatesTable() { - runTableInvalidationTest( - restCatalog, - adapterForRESTServer, - (catalog) -> - catalog.renameTable(TABLE, TableIdentifier.of(TABLE.namespace(), "other_table")), - 0); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testDropTableInvalidatesTable(boolean purge) { - runTableInvalidationTest( - restCatalog, adapterForRESTServer, (catalog) -> catalog.dropTable(TABLE, purge), 0); - } - - @Test - public void testTableExistViaHeadRequestInvalidatesTable() { - runTableInvalidationTest( - restCatalog, - adapterForRESTServer, - ((catalog) -> { - // Use a different catalog to drop the table - catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); - - // The main catalog still has the table in cache - assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - catalog.tableExists(TABLE); - }), - 0); - } - - @Test - public void testTableExistViaGetRequestInvalidatesTable() { - RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); - - // Configure REST server to answer tableExists query via GET - Mockito.doAnswer( - invocation -> - ConfigResponse.builder() - .withEndpoints( - ImmutableList.of( - Endpoint.V1_LOAD_TABLE, - Endpoint.V1_CREATE_NAMESPACE, - Endpoint.V1_CREATE_TABLE)) - .build()) - .when(adapter) - .execute( - reqMatcher(HTTPMethod.GET, ResourcePaths.config()), - eq(ConfigResponse.class), - any(), - any()); - - RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); - catalog.initialize( - "catalog", - ImmutableMap.of( - CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); - - runTableInvalidationTest( - catalog, - adapter, - (cat) -> { - // Use a different catalog to drop the table - catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); - - // The main catalog still has the table in cache - assertThat(cat.sessionCatalog().tableCache().cache().asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - cat.tableExists(TABLE); - }, - 1); - } - - @Test - public void testLoadTableInvalidatesCache() { - runTableInvalidationTest( - restCatalog, - adapterForRESTServer, - (catalog) -> { - // Use a different catalog to drop the table - catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); - - // The main catalog still has the table in cache - assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - assertThatThrownBy(() -> catalog.loadTable(TABLE)) - .isInstanceOf(NoSuchTableException.class) - .hasMessage("Table does not exist: %s", TABLE); - }, - 1); - } - - @Test - public void testLoadTableWithMetadataTableNameInvalidatesCache() { - TableIdentifier metadataTableIdentifier = - TableIdentifier.of(TABLE.namespace().toString(), TABLE.name(), "partitions"); - - runTableInvalidationTest( - restCatalog, - adapterForRESTServer, - (catalog) -> { - // Use a different catalog to drop the table - catalog(new RESTCatalogAdapter(backendCatalog)).dropTable(TABLE, true); - - // The main catalog still has the table in cache - assertThat(catalog.sessionCatalog().tableCache().cache().asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - assertThatThrownBy(() -> catalog.loadTable(metadataTableIdentifier)) - .isInstanceOf(NoSuchTableException.class) - .hasMessage("Table does not exist: %s", TABLE); - }, - 1); - - ResourcePaths paths = - ResourcePaths.forCatalogProperties( - ImmutableMap.of(RESTCatalogProperties.NAMESPACE_SEPARATOR, "%2E")); - - Mockito.verify(adapterForRESTServer) - .execute( - reqMatcher(HTTPMethod.GET, paths.table(metadataTableIdentifier)), any(), any(), any()); - } - - private void runTableInvalidationTest( - RESTCatalog catalog, - RESTCatalogAdapter adapterToVerify, - Consumer action, - int loadTableCountFromAction) { - catalog.createNamespace(TABLE.namespace()); - - catalog.createTable(TABLE, SCHEMA); - - BaseTable originalTable = (BaseTable) catalog.loadTable(TABLE); - - Cache tableCache = - catalog.sessionCatalog().tableCache().cache(); - assertThat(tableCache.stats().hitCount()).isZero(); - assertThat(tableCache.asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - action.accept(catalog); - - // Check that 'action' invalidates cache - assertThat(tableCache.estimatedSize()).isZero(); - - assertThatThrownBy(() -> catalog.loadTable(TABLE)) - .isInstanceOf(NoSuchTableException.class) - .hasMessageContaining("Table does not exist: %s", TABLE); - - catalog.createTable(TABLE, SCHEMA); - - expectFullTableLoadForLoadTable(TABLE, adapterToVerify); - - BaseTable newTableWithSameName = (BaseTable) catalog.loadTable(TABLE); - - assertThat(tableCache.stats().hitCount()).isEqualTo(loadTableCountFromAction); - assertThat(tableCache.asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - assertThat(newTableWithSameName).isNotEqualTo(originalTable); - assertThat(newTableWithSameName.operations().current().metadataFileLocation()) - .isNotEqualTo(originalTable.operations().current().metadataFileLocation()); - - Mockito.verify(adapterToVerify, times(3 + loadTableCountFromAction)) - .execute(reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any()); - } - - @Test - public void testTableCacheWithMultiSessions() { - RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); - - RESTSessionCatalog sessionCatalog = new RESTSessionCatalog(config -> adapter, null); - sessionCatalog.initialize("test_session_catalog", Map.of()); - - SessionCatalog.SessionContext otherSessionContext = - new SessionCatalog.SessionContext( - "session_id_2", "user", ImmutableMap.of("credential", "user:12345"), ImmutableMap.of()); - - sessionCatalog.createNamespace(DEFAULT_SESSION_CONTEXT, TABLE.namespace()); - - sessionCatalog.buildTable(DEFAULT_SESSION_CONTEXT, TABLE, SCHEMA).create(); - - expectFullTableLoadForLoadTable(TABLE, adapter); - - sessionCatalog.loadTable(DEFAULT_SESSION_CONTEXT, TABLE); - - Cache tableCache = sessionCatalog.tableCache().cache(); - assertThat(tableCache.stats().hitCount()).isZero(); - assertThat(tableCache.asMap()) - .containsOnlyKeys(SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE)); - - expectFullTableLoadForLoadTable(TABLE, adapter); - - sessionCatalog.loadTable(otherSessionContext, TABLE); - - assertThat(tableCache.asMap()) - .containsOnlyKeys( - SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE), - SessionIdTableId.of(otherSessionContext.sessionId(), TABLE)); - } - - @Test - public void test304NotModifiedResponseWithEmptyTableCache() { - Mockito.doAnswer(invocation -> null) - .when(adapterForRESTServer) - .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), - eq(LoadTableResponse.class), - any(), - any()); - - catalog().createNamespace(TABLE.namespace()); - - catalog().createTable(TABLE, SCHEMA); - - catalog().invalidateTable(TABLE); - - // Table is not in the cache and null LoadTableResponse is received - assertThatThrownBy(() -> catalog().loadTable(TABLE)) - .isInstanceOf(RESTException.class) - .hasMessage( - "Invalid (NOT_MODIFIED) response for request: method=%s, path=%s", - HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)); - } - - @Test - public void testTableCacheNotUpdatedWithoutETag() { - RESTCatalogAdapter adapter = - Mockito.spy( - new RESTCatalogAdapter(backendCatalog) { - @Override - public T execute( - HTTPRequest request, - Class responseType, - Consumer errorHandler, - Consumer> responseHeaders) { - // Wrap the original responseHeaders to not accept ETag. - Consumer> noETagConsumer = - headers -> { - if (!headers.containsKey(HttpHeaders.ETAG)) { - responseHeaders.accept(headers); - } - }; - return super.execute(request, responseType, errorHandler, noETagConsumer); - } - }); - - RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); - catalog.initialize( - "catalog", - ImmutableMap.of( - CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); - - catalog.createNamespace(TABLE.namespace()); - - catalog.createTable(TABLE, SCHEMA); - - catalog.loadTable(TABLE); - - assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); - } - - @Test - public void testTableCacheIsDisabled() { - RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); - - RESTCatalog catalog = new RESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter); - catalog.initialize( - "catalog", - ImmutableMap.of( - CatalogProperties.FILE_IO_IMPL, - "org.apache.iceberg.inmemory.InMemoryFileIO", - RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, - "0")); - - catalog.createNamespace(TABLE.namespace()); - - catalog.createTable(TABLE, SCHEMA); - - assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); - - expectFullTableLoadForLoadTable(TABLE, adapter); - - catalog.loadTable(TABLE); - - catalog.sessionCatalog().tableCache().cache().cleanUp(); - - assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); - } - - @Test - public void testFullTableLoadAfterExpiryFromCache() { - RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); - - FakeTicker ticker = new FakeTicker(); - - TestableRESTCatalog catalog = - new TestableRESTCatalog(DEFAULT_SESSION_CONTEXT, config -> adapter, ticker); - catalog.initialize("catalog", Map.of()); - - catalog.createNamespace(TABLE.namespace()); - - catalog.createTable(TABLE, SCHEMA); - - catalog.loadTable(TABLE); - - Cache tableCache = - catalog.sessionCatalog().tableCache().cache(); - SessionIdTableId tableCacheKey = - SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); - - assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); - assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) - .isPresent() - .get() - .isEqualTo(Duration.ZERO); - - ticker.advance(HALF_OF_TABLE_EXPIRATION); - - assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); - assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) - .isPresent() - .get() - .isEqualTo(HALF_OF_TABLE_EXPIRATION); - - ticker.advance(HALF_OF_TABLE_EXPIRATION.plus(Duration.ofSeconds(10))); - - assertThat(tableCache.asMap()).doesNotContainKey(tableCacheKey); - - expectFullTableLoadForLoadTable(TABLE, adapter); - - catalog.loadTable(TABLE); - - assertThat(tableCache.stats().hitCount()).isEqualTo(0); - assertThat(tableCache.asMap()).containsOnlyKeys(tableCacheKey); - assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) - .isPresent() - .get() - .isEqualTo(Duration.ZERO); - } - - @Test - public void testTableCacheAgeDoesNotRefreshesAfterAccess() { - FakeTicker ticker = new FakeTicker(); - - TestableRESTCatalog catalog = - new TestableRESTCatalog( - DEFAULT_SESSION_CONTEXT, config -> new RESTCatalogAdapter(backendCatalog), ticker); - catalog.initialize("catalog", Map.of()); - - catalog.createNamespace(TABLE.namespace()); - - catalog.createTable(TABLE, SCHEMA); - - catalog.loadTable(TABLE); - - ticker.advance(HALF_OF_TABLE_EXPIRATION); - - Cache tableCache = - catalog.sessionCatalog().tableCache().cache(); - SessionIdTableId tableCacheKey = - SessionIdTableId.of(DEFAULT_SESSION_CONTEXT.sessionId(), TABLE); - - assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) - .isPresent() - .get() - .isEqualTo(HALF_OF_TABLE_EXPIRATION); - - catalog.loadTable(TABLE); - - assertThat(tableCache.policy().expireAfterWrite().get().ageOf(tableCacheKey)) - .isPresent() - .get() - .isEqualTo(HALF_OF_TABLE_EXPIRATION); - } - private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); @@ -4355,95 +3606,6 @@ protected RESTSessionCatalog newSessionCatalog( return catalog; } - private void expectFullTableLoadForLoadTable(TableIdentifier ident, RESTCatalogAdapter adapter) { - Answer invocationAssertsFullLoad = - invocation -> { - LoadTableResponse response = (LoadTableResponse) invocation.callRealMethod(); - - assertThat(response).isNotEqualTo(null); - - return response; - }; - - Mockito.doAnswer(invocationAssertsFullLoad) - .when(adapter) - .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)), - eq(LoadTableResponse.class), - any(), - any()); - } - - private void expectNotModifiedResponseForLoadTable( - TableIdentifier ident, RESTCatalogAdapter adapter) { - Answer invocationAssertsFullLoad = - invocation -> { - LoadTableResponse response = (LoadTableResponse) invocation.callRealMethod(); - - assertThat(response).isEqualTo(null); - - return response; - }; - - Mockito.doAnswer(invocationAssertsFullLoad) - .when(adapter) - .execute( - reqMatcher(HTTPMethod.GET, RESOURCE_PATHS.table(ident)), - eq(LoadTableResponse.class), - any(), - any()); - } - - static HTTPRequest reqMatcher(HTTPMethod method) { - return argThat(req -> req.method() == method); - } - - static HTTPRequest reqMatcher(HTTPMethod method, String path) { - return argThat(req -> req.method() == method && req.path().equals(path)); - } - - static HTTPRequest reqMatcher(HTTPMethod method, String path, Map headers) { - return argThat( - req -> - req.method() == method - && req.path().equals(path) - && req.headers().equals(HTTPHeaders.of(headers))); - } - - static HTTPRequest reqMatcher( - HTTPMethod method, String path, Map headers, Map parameters) { - return argThat( - req -> - req.method() == method - && req.path().equals(path) - && req.headers().equals(HTTPHeaders.of(headers)) - && req.queryParameters().equals(parameters)); - } - - static HTTPRequest reqMatcher( - HTTPMethod method, - String path, - Map headers, - Map parameters, - Object body) { - return argThat( - req -> - req.method() == method - && req.path().equals(path) - && req.headers().equals(HTTPHeaders.of(headers)) - && req.queryParameters().equals(parameters) - && Objects.equals(req.body(), body)); - } - - static HTTPRequest reqMatcherContainsHeaders( - HTTPMethod method, String path, Map headers) { - return argThat( - req -> - req.method() == method - && req.path().equals(path) - && req.headers().entries().containsAll(HTTPHeaders.of(headers).entries())); - } - private static List allRequests(RESTCatalogAdapter adapter) { ArgumentCaptor captor = ArgumentCaptor.forClass(HTTPRequest.class); verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any()); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index 0b1453682b75..ab0e1d9c56d0 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -32,20 +32,14 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import java.io.File; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ContentScanTask; @@ -60,11 +54,9 @@ import org.apache.iceberg.ScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; -import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -72,126 +64,53 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ErrorResponse; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.gzip.GzipHandler; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Mockito; -public class TestRESTScanPlanning { - private static final ObjectMapper MAPPER = RESTObjectMapper.mapper(); - private static final Namespace NS = Namespace.of("ns"); - - private InMemoryCatalog backendCatalog; - private Server httpServer; - private RESTCatalogAdapter adapterForRESTServer; - private ParserContext parserContext; - @TempDir private Path temp; - private RESTCatalog restCatalogWithScanPlanning; - - @BeforeEach - public void setupCatalogs() throws Exception { - File warehouse = temp.toFile(); - this.backendCatalog = new InMemoryCatalog(); - this.backendCatalog.initialize( - "in-memory", - ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); - - adapterForRESTServer = - Mockito.spy( - new RESTCatalogAdapter(backendCatalog) { - @Override - public T execute( - HTTPRequest request, - Class responseType, - Consumer errorHandler, - Consumer> responseHeaders) { - if (ResourcePaths.config().equals(request.path())) { - return castResponse( - responseType, - ConfigResponse.builder() - .withEndpoints( - Arrays.stream(Route.values()) - .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) - .collect(Collectors.toList())) - .withOverrides( - ImmutableMap.of( - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")) - .build()); - } - Object body = roundTripSerialize(request.body(), "request"); - HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); - T response = super.execute(req, responseType, errorHandler, responseHeaders); - return roundTripSerialize(response, "response"); - } - }); - - ServletContextHandler servletContext = - new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - servletContext.addServlet( - new ServletHolder(new RESTCatalogServlet(adapterForRESTServer)), "/*"); - servletContext.setHandler(new GzipHandler()); - - this.httpServer = new Server(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); - httpServer.setHandler(servletContext); - httpServer.start(); - - // Initialize catalog with scan planning enabled - this.restCatalogWithScanPlanning = initCatalog("prod-with-scan-planning", ImmutableMap.of()); - } - - @AfterEach - public void teardownCatalogs() throws Exception { - if (restCatalogWithScanPlanning != null) { - restCatalogWithScanPlanning.close(); - } - - if (backendCatalog != null) { - backendCatalog.close(); - } - - if (httpServer != null) { - httpServer.stop(); - httpServer.join(); - } +public class TestRESTScanPlanning extends TestBaseWithRESTServer { + @Override + protected RESTCatalogAdapter createAdapterForServer() { + return Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse( + responseType, + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .withOverrides( + ImmutableMap.of(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")) + .build()); + } + Object body = roundTripSerialize(request.body(), "request"); + HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); + T response = super.execute(req, responseType, errorHandler, responseHeaders); + return roundTripSerialize(response, "response"); + } + }); + } + + @Override + protected String catalogName() { + return "prod-with-scan-planning"; } // ==================== Helper Methods ==================== - private RESTCatalog initCatalog(String catalogName, Map additionalProperties) { - RESTCatalog catalog = - new RESTCatalog( - SessionCatalog.SessionContext.createEmpty(), - (config) -> - HTTPClient.builder(config) - .uri(config.get(CatalogProperties.URI)) - .withHeaders(RESTUtil.configHeaders(config)) - .build()); - catalog.setConf(new Configuration()); - Map properties = - ImmutableMap.of( - CatalogProperties.URI, - httpServer.getURI().toString(), - CatalogProperties.FILE_IO_IMPL, - "org.apache.iceberg.inmemory.InMemoryFileIO"); - catalog.initialize( - catalogName, - ImmutableMap.builder() - .putAll(properties) - .putAll(additionalProperties) - .build()); - return catalog; - } - + @Override @SuppressWarnings("unchecked") - private T roundTripSerialize(T payload, String description) { + protected T roundTripSerialize(T payload, String description) { if (payload == null) { return null; } @@ -219,10 +138,6 @@ private void setParserContext(Table table) { ParserContext.builder().add("specsById", table.specs()).add("caseSensitive", false).build(); } - private RESTCatalog scanPlanningCatalog() { - return restCatalogWithScanPlanning; - } - private void configurePlanningBehavior( Function configurator) { TestPlanningBehavior.Builder builder = TestPlanningBehavior.builder(); @@ -335,7 +250,7 @@ void scanPlanningWithAllTasksInSingleResponse( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "all_tasks_table"); + Table table = restTableFor(restCatalog, "all_tasks_table"); setParserContext(table); // Verify actual data file is returned with correct count @@ -354,7 +269,7 @@ void scanPlanningWithBatchScan( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "batch_scan_table"); + Table table = restTableFor(restCatalog, "batch_scan_table"); setParserContext(table); // Verify actual data file is returned with correct count @@ -372,7 +287,7 @@ public void nestedPlanTaskPagination() throws IOException { // Configure: synchronous planning with very small pages (creates nested plan task structure) configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination); - Table table = restTableFor(scanPlanningCatalog(), "nested_plan_task_table"); + Table table = restTableFor(restCatalog, "nested_plan_task_table"); // add one more files for proper pagination table.newFastAppend().appendFile(FILE_B).commit(); setParserContext(table); @@ -393,7 +308,7 @@ public void nestedPlanTaskPagination() throws IOException { @Test public void cancelPlanMethodAvailability() { configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination); - RESTTable table = restTableFor(scanPlanningCatalog(), "cancel_method_table"); + RESTTable table = restTableFor(restCatalog, "cancel_method_table"); RESTTableScan restTableScan = restTableScanFor(table); // Test that cancelPlan method is available and callable @@ -407,7 +322,7 @@ public void cancelPlanMethodAvailability() { @Test public void iterableCloseTriggersCancel() throws IOException { configurePlanningBehavior(TestPlanningBehavior.Builder::asynchronous); - RESTTable restTable = restTableFor(scanPlanningCatalog(), "iterable_close_test"); + RESTTable restTable = restTableFor(restCatalog, "iterable_close_test"); setParserContext(restTable); TableScan scan = restTable.newScan(); @@ -430,7 +345,7 @@ public void iterableCloseTriggersCancel() throws IOException { @EnumSource(MetadataTableType.class) public void metadataTablesWithRemotePlanning(MetadataTableType type) { configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous); - RESTTable table = restTableFor(scanPlanningCatalog(), "metadata_tables_test"); + RESTTable table = restTableFor(restCatalog, "metadata_tables_test"); table.newAppend().appendFile(FILE_B).commit(); table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_EQUALITY_DELETES).commit(); setParserContext(table); @@ -452,7 +367,7 @@ public void metadataTablesWithRemotePlanning(MetadataTableType type) { void remoteScanPlanningWithEmptyTable( Function planMode) { configurePlanningBehavior(planMode); - Table table = createTableWithScanPlanning(scanPlanningCatalog(), "empty_table_test"); + Table table = createTableWithScanPlanning(restCatalog, "empty_table_test"); setParserContext(table); assertThat(table.newScan().planFiles()).isEmpty(); } @@ -463,7 +378,7 @@ void remoteScanPlanningWithEmptyTable( void remoteScanPlanningWithNonExistentColumn( Function planMode) { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "non-existent_column"); + Table table = restTableFor(restCatalog, "non-existent_column"); setParserContext(table); assertThat(table.newScan().select("non-existent-column").planFiles()).isEmpty(); } @@ -473,7 +388,7 @@ void remoteScanPlanningWithNonExistentColumn( void incrementalScan( Function planMode) { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "incremental_scan"); + Table table = restTableFor(restCatalog, "incremental_scan"); setParserContext(table); // Add second file to the table @@ -499,7 +414,7 @@ void remoteScanPlanningWithPositionDeletes( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "position_deletes_test"); + Table table = restTableFor(restCatalog, "position_deletes_test"); setParserContext(table); // Add position deletes that correspond to FILE_A (which was added in table creation) @@ -535,7 +450,7 @@ void remoteScanPlanningWithEqualityDeletes( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "equality_deletes_test"); + Table table = restTableFor(restCatalog, "equality_deletes_test"); setParserContext(table); // Add equality deletes that correspond to FILE_A @@ -569,7 +484,7 @@ void remoteScanPlanningWithMixedDeletes( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "mixed_deletes_test"); + Table table = restTableFor(restCatalog, "mixed_deletes_test"); setParserContext(table); // Add both position and equality deletes in separate commits @@ -606,7 +521,7 @@ void remoteScanPlanningWithMultipleDeleteFiles( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "multiple_deletes_test"); + Table table = restTableFor(restCatalog, "multiple_deletes_test"); setParserContext(table); // Add FILE_B and FILE_C to the table (FILE_A is already added during table creation) @@ -668,7 +583,7 @@ void remoteScanPlanningWithDeletesAndFiltering( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "deletes_filtering_test"); + Table table = restTableFor(restCatalog, "deletes_filtering_test"); setParserContext(table); // Add FILE_B to have more data for filtering @@ -713,7 +628,7 @@ void remoteScanPlanningDeletesCancellation( Function planMode) throws IOException { configurePlanningBehavior(planMode); - Table table = restTableFor(scanPlanningCatalog(), "deletes_cancellation_test"); + Table table = restTableFor(restCatalog, "deletes_cancellation_test"); setParserContext(table); // Add deletes to make the scenario more complex @@ -742,7 +657,7 @@ void remoteScanPlanningWithTimeTravel( configurePlanningBehavior(planMode); // Create table and add FILE_A (snapshot 1) - Table table = restTableFor(scanPlanningCatalog(), "snapshot_scan_test"); + Table table = restTableFor(restCatalog, "snapshot_scan_test"); setParserContext(table); table.refresh(); long snapshot1Id = table.currentSnapshot().snapshotId(); @@ -814,7 +729,7 @@ void remoteScanPlanningWithTimeTravel( public void scanPlanningWithMultiplePartitionSpecs() throws IOException { configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous); - RESTTable table = restTableFor(scanPlanningCatalog(), "multiple_partition_specs"); + RESTTable table = restTableFor(restCatalog, "multiple_partition_specs"); table.newFastAppend().appendFile(FILE_B).commit(); // Evolve partition spec to bucket by id with 8 buckets instead of 16 @@ -861,19 +776,17 @@ public void scanPlanningWithMultiplePartitionSpecs() throws IOException { @Test void remoteScanPlanningWithFreshnessAwareLoading() throws IOException { - RESTCatalog catalog = scanPlanningCatalog(); - TableIdentifier tableIdentifier = TableIdentifier.of(NS, "freshness_aware_loading_test"); - restTableFor(catalog, tableIdentifier.name()); + restTableFor(restCatalog, tableIdentifier.name()); - assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); + assertThat(restCatalog.sessionCatalog().tableCache().cache().estimatedSize()).isZero(); // Table is cached with the first loadTable - catalog.loadTable(tableIdentifier); - assertThat(catalog.sessionCatalog().tableCache().cache().estimatedSize()).isOne(); + restCatalog.loadTable(tableIdentifier); + assertThat(restCatalog.sessionCatalog().tableCache().cache().estimatedSize()).isOne(); // Second loadTable is answered from cache - Table table = catalog.loadTable(tableIdentifier); + Table table = restCatalog.loadTable(tableIdentifier); // Verify table is RESTTable and newScan() returns RESTTableScan restTableScanFor(table); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTTableCache.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTTableCache.java new file mode 100644 index 000000000000..165b508e5272 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTTableCache.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.apache.iceberg.rest.RESTTableCache.SessionIdTableId; +import static org.apache.iceberg.rest.RESTTableCache.TableWithETag; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.file.Path; +import java.time.Duration; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.FakeTicker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestRESTTableCache { + @TempDir private static Path temp; + + private static final String SESSION_ID = SessionCatalog.SessionContext.createEmpty().sessionId(); + private static final String TABLE_NAME = "tbl"; + private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("ns", TABLE_NAME); + private static final Supplier TABLE_SUPPLIER = + () -> + new BaseTable(new TestTables.TestTableOperations(TABLE_NAME, temp.toFile()), TABLE_NAME); + private static final String ETAG = "d7sa6das"; + private static final Duration HALF_OF_TABLE_EXPIRATION = + Duration.ofMillis(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS_DEFAULT) + .dividedBy(2); + + @Test + public void invalidProperties() { + assertThatThrownBy( + () -> + new RESTTableCache( + Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "0"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid expire after write: zero or negative"); + + assertThatThrownBy( + () -> + new RESTTableCache( + Map.of(RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, "-1"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid expire after write: zero or negative"); + + assertThatThrownBy( + () -> new RESTTableCache(Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "-1"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid max entries: negative"); + } + + @Test + public void basicPutAndGet() { + RESTTableCache cache = new RESTTableCache(Map.of()); + cache.put(SESSION_ID, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + + assertThat(cache.cache().asMap()).hasSize(1); + assertThat(cache.cache().asMap()) + .containsKeys(SessionIdTableId.of(SESSION_ID, TABLE_IDENTIFIER)); + + TableWithETag tableWithETag = cache.getIfPresent(SESSION_ID, TABLE_IDENTIFIER); + + assertThat(tableWithETag.supplier()).isSameAs(TABLE_SUPPLIER); + assertThat(tableWithETag.eTag()).isSameAs(ETAG); + } + + @Test + public void notFoundInCache() { + RESTTableCache cache = new RESTTableCache(Map.of()); + cache.put(SESSION_ID, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + + assertThat(cache.getIfPresent("some_id", TABLE_IDENTIFIER)).isNull(); + assertThat(cache.getIfPresent(SESSION_ID, TableIdentifier.of("ns", "other_table"))).isNull(); + } + + @Test + public void tableInMultipleSessions() { + RESTTableCache cache = new RESTTableCache(Map.of()); + String otherSessionId = "sessionID2"; + cache.put(SESSION_ID, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + cache.put(otherSessionId, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + + assertThat(cache.cache().asMap()).hasSize(2); + + cache.invalidate(SESSION_ID, TABLE_IDENTIFIER); + TableWithETag tableWithETag = cache.getIfPresent(otherSessionId, TABLE_IDENTIFIER); + cache.cache().cleanUp(); + + assertThat(cache.cache().asMap()).hasSize(1); + assertThat(cache.getIfPresent(SESSION_ID, TABLE_IDENTIFIER)).isNull(); + assertThat(tableWithETag.supplier()).isSameAs(TABLE_SUPPLIER); + assertThat(tableWithETag.eTag()).isSameAs(ETAG); + } + + @Test + public void maxEntriesReached() { + RESTTableCache cache = new RESTTableCache(Map.of()); + // Add more items than the max limit + for (int i = 0; i < RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES_DEFAULT + 10; ++i) { + cache.put(SESSION_ID, TableIdentifier.of("ns", "tbl" + i), TABLE_SUPPLIER, ETAG); + } + cache.cache().cleanUp(); + + assertThat(cache.cache().asMap()) + .hasSize(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES_DEFAULT); + } + + @Test + public void configureMaxEntriesReached() { + RESTTableCache cache = + new RESTTableCache(Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "1")); + TableIdentifier otherTableIdentifier = TableIdentifier.of("ns", "other_table"); + cache.put(SESSION_ID, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + cache.put(SESSION_ID, otherTableIdentifier, TABLE_SUPPLIER, ETAG); + cache.cache().cleanUp(); + + assertThat(cache.cache().asMap()).hasSize(1); + assertThat(cache.getIfPresent(SESSION_ID, otherTableIdentifier)).isNotNull(); + assertThat(cache.getIfPresent(SESSION_ID, TABLE_IDENTIFIER)).isNull(); + } + + @Test + public void cacheTurnedOff() { + RESTTableCache cache = + new RESTTableCache(Map.of(RESTCatalogProperties.TABLE_CACHE_MAX_ENTRIES, "0")); + cache.put(SESSION_ID, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + cache.cache().cleanUp(); + + assertThat(cache.cache().asMap()).isEmpty(); + } + + @Test + public void entryExpires() { + FakeTicker ticker = new FakeTicker(); + RESTTableCache cache = new RESTTableCache(Map.of(), ticker); + cache.put(SESSION_ID, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + + SessionIdTableId cacheKey = SessionIdTableId.of(SESSION_ID, TABLE_IDENTIFIER); + assertThat(cache.cache().policy().expireAfterAccess()).isNotPresent(); + assertThat(cache.cache().policy().expireAfterWrite().get().ageOf(cacheKey)) + .isPresent() + .get() + .isEqualTo(Duration.ZERO); + + ticker.advance(HALF_OF_TABLE_EXPIRATION); + + assertThat(cache.cache().asMap()).containsOnlyKeys(cacheKey); + assertThat(cache.cache().policy().expireAfterWrite().get().ageOf(cacheKey)) + .isPresent() + .get() + .isEqualTo(HALF_OF_TABLE_EXPIRATION); + + ticker.advance(HALF_OF_TABLE_EXPIRATION.plus(Duration.ofSeconds(10))); + cache.cache().cleanUp(); + + assertThat(cache.cache().asMap()).doesNotContainKey(cacheKey); + } + + @Test + public void configureExpiration() { + FakeTicker ticker = new FakeTicker(); + Duration expirationInterval = Duration.ofSeconds(30); + RESTTableCache cache = + new RESTTableCache( + Map.of( + RESTCatalogProperties.TABLE_CACHE_EXPIRE_AFTER_WRITE_MS, + String.valueOf(expirationInterval.toMillis())), + ticker); + cache.put(SESSION_ID, TABLE_IDENTIFIER, TABLE_SUPPLIER, ETAG); + + assertThat(cache.getIfPresent(SESSION_ID, TABLE_IDENTIFIER)).isNotNull(); + + ticker.advance(expirationInterval.plus(Duration.ofSeconds(10))); + cache.cache().cleanUp(); + + assertThat(cache.getIfPresent(SESSION_ID, TABLE_IDENTIFIER)).isNull(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java index 8bfe26b18cda..fd2faf55087c 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java @@ -18,7 +18,7 @@ */ package org.apache.iceberg.rest; -import static org.apache.iceberg.rest.TestRESTCatalog.reqMatcher; +import static org.apache.iceberg.rest.RequestMatcher.matches; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -199,11 +199,11 @@ public void testPaginationForListViews(int numberOfItems) { assertThat(views).hasSize(numberOfItems); Mockito.verify(adapter) - .execute(reqMatcher(HTTPMethod.GET, "v1/config"), eq(ConfigResponse.class), any(), any()); + .execute(matches(HTTPMethod.GET, "v1/config"), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter, times(numberOfItems)) .execute( - reqMatcher(HTTPMethod.POST, String.format("v1/namespaces/%s/views", namespaceName)), + matches(HTTPMethod.POST, String.format("v1/namespaces/%s/views", namespaceName)), eq(LoadViewResponse.class), any(), any()); @@ -249,13 +249,13 @@ public void viewExistsViaHEADRequest() { Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, "v1/config", Map.of(), Map.of()), + matches(HTTPMethod.GET, "v1/config", Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.HEAD, "v1/namespaces/ns/views/view", Map.of(), Map.of()), + matches(HTTPMethod.HEAD, "v1/namespaces/ns/views/view", Map.of(), Map.of()), any(), any(), any()); @@ -296,13 +296,13 @@ public T execute( Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, "v1/config", Map.of(), Map.of()), + matches(HTTPMethod.GET, "v1/config", Map.of(), Map.of()), eq(ConfigResponse.class), any(), any()); Mockito.verify(adapter) .execute( - reqMatcher(HTTPMethod.GET, "v1/namespaces/ns/views/view", Map.of(), Map.of()), + matches(HTTPMethod.GET, "v1/namespaces/ns/views/view", Map.of(), Map.of()), any(), any(), any()); @@ -394,7 +394,7 @@ protected RESTViewOperations newViewOps( ResourcePaths resourcePaths = ResourcePaths.forCatalogProperties(Maps.newHashMap()); Mockito.verify(adapter, Mockito.atLeastOnce()) .execute( - reqMatcher(HTTPMethod.POST, resourcePaths.view(viewIdentifier), customHeaders), + matches(HTTPMethod.POST, resourcePaths.view(viewIdentifier), customHeaders), eq(LoadViewResponse.class), any(), any());