diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index dce7697319ff..4cc46ea8ce59 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -66,6 +66,7 @@ public class SerializableTable implements Table, HasTableOperations, Serializabl private final UUID uuid; private final int formatVersion; private final Try locationProviderTry; + private final TableMetadata tableMetadata; private transient volatile Table lazyTable = null; private transient volatile Schema lazySchema = null; @@ -89,6 +90,13 @@ protected SerializableTable(Table table) { this.refs = SerializableMap.copyOf(table.refs()); this.uuid = table.uuid(); this.formatVersion = formatVersion(table); + + // Only store full TableMetadata if the table requires remote scan planning. + if (table instanceof RequiresRemoteScanPlanning) { + this.tableMetadata = tableMetadata(table); + } else { + this.tableMetadata = null; + } } /** @@ -136,13 +144,20 @@ private Table lazyTable() { if (lazyTable == null) { synchronized (this) { if (lazyTable == null) { - if (metadataFileLocation == null) { + TableOperations ops; + + if (tableMetadata != null) { + // Use serialized TableMetadata (tables requiring remote scan planning) + // This avoids storage access for distributed query planning + ops = new StaticTableOperations(tableMetadata, io, locationProvider()); + } else if (metadataFileLocation != null) { + // Read from storage using metadata location + ops = new StaticTableOperations(metadataFileLocation, io, locationProvider()); + } else { throw new UnsupportedOperationException( - "Cannot load metadata: metadata file location is null"); + "Cannot load metadata: both table metadata and metadata file location are null"); } - TableOperations ops = - new StaticTableOperations(metadataFileLocation, io, locationProvider()); this.lazyTable = newTable(ops, name); } } @@ -186,6 +201,13 @@ private int formatVersion(Table table) { } } + private TableMetadata tableMetadata(Table table) { + if (table instanceof HasTableOperations) { + return ((HasTableOperations) table).operations().current(); + } + return null; + } + @Override public Schema schema() { if (lazySchema == null) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 7dac5d401a80..eb180db85f78 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -197,7 +197,7 @@ public String toString() { } } - public static class MetadataLogEntry { + public static class MetadataLogEntry implements Serializable { private final long timestampMillis; private final String file; diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index df4ba3214aea..9b9684cff650 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -62,9 +62,11 @@ import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -3528,4 +3530,175 @@ private static List allRequests(RESTCatalogAdapter adapter) { verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any()); return captor.getAllValues(); } + + @Test + public void serializableTable() throws IOException, ClassNotFoundException { + Schema schema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + restCatalog.createNamespace(TABLE.namespace()); + Table table = restCatalog.createTable(TABLE, schema); + + // Add data files to create snapshots + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + + // Create SerializableTable from REST catalog + SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); + + // Test with Java serialization + Table javaSerialized = TestHelpers.roundTripSerialize(serializableTable); + verifySerializedTable(javaSerialized, table); + + // Test with Kryo serialization + Table kryoSerialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableTable); + verifySerializedTable(kryoSerialized, table); + } + + private void verifySerializedTable(Table deserialized, Table original) { + // Verify that basic operations work using serialized table metadata + // (tables requiring remote scan planning serialize full metadata) + assertThat(deserialized.schema().asStruct()).isEqualTo(original.schema().asStruct()); + assertThat(deserialized.spec().specId()).isEqualTo(original.spec().specId()); + assertThat(deserialized.location()).isEqualTo(original.location()); + assertThat(deserialized.properties()).containsAllEntriesOf(original.properties()); + + // Verify snapshot operations work using serialized metadata + assertThat(deserialized.currentSnapshot()).isNotNull(); + assertThat(deserialized.currentSnapshot().snapshotId()) + .isEqualTo(original.currentSnapshot().snapshotId()); + + // Verify snapshots are accessible + assertThat(deserialized.snapshots()).isNotNull().hasSameSizeAs(original.snapshots()); + + // Verify scan operations work + assertThat(deserialized.newScan()).isNotNull(); + } + + @Test + public void serializableTableWithSchemaEvolution() throws IOException, ClassNotFoundException { + // Create initial table + Schema initialSchema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + + TableIdentifier ident = TableIdentifier.of(Namespace.of("ns"), "schema_evolution_table"); + restCatalog.createNamespace(ident.namespace()); + Table table = restCatalog.createTable(ident, initialSchema); + + // Evolve schema + table.updateSchema().addColumn("new_col1", Types.IntegerType.get()).commit(); + table.newAppend().appendFile(FILE_A).commit(); + + // Evolve schema again + table.updateSchema().addColumn("new_col2", Types.StringType.get()).commit(); + table.newAppend().appendFile(FILE_B).commit(); + + // Create and serialize table + SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); + + // Test with Java serialization + Table javaSerialized = TestHelpers.roundTripSerialize(serializableTable); + verifySchemaEvolution(javaSerialized, table); + + // Test with Kryo serialization + Table kryoSerialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableTable); + verifySchemaEvolution(kryoSerialized, table); + } + + private void verifySchemaEvolution(Table deserialized, Table original) { + // Verify current schema + assertThat(deserialized.schema().columns()).hasSameSizeAs(original.schema().columns()); + + // Verify all historical schemas are preserved + assertThat(deserialized.schemas()).isNotNull().hasSameSizeAs(original.schemas()); + + // Verify scans work + assertThat(deserialized.newScan()).isNotNull(); + } + + @Test + public void serializableTableSnapshotRefs() throws IOException, ClassNotFoundException { + // Create table with data + Schema schema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + + TableIdentifier ident = TableIdentifier.of(Namespace.of("ns"), "refs_table"); + restCatalog.createNamespace(ident.namespace()); + Table table = restCatalog.createTable(ident, schema); + + // Create first snapshot and tag it + table.newAppend().appendFile(FILE_A).commit(); + long taggedSnapshotId = table.currentSnapshot().snapshotId(); + table + .manageSnapshots() + .createTag("production", taggedSnapshotId) + .setMaxRefAgeMs("production", Long.MAX_VALUE) + .commit(); + + // Add more data + table.newAppend().appendFile(FILE_B).commit(); + + // Serialize and deserialize + SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); + + // Test with Java serialization + Table javaSerialized = TestHelpers.roundTripSerialize(serializableTable); + assertThat(javaSerialized.refs()).containsEntry("production", table.refs().get("production")); + assertThat(javaSerialized.snapshot(taggedSnapshotId)).isNotNull(); + + // Test with Kryo serialization + Table kryoSerialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableTable); + assertThat(kryoSerialized.refs()).containsEntry("production", table.refs().get("production")); + assertThat(kryoSerialized.snapshot(taggedSnapshotId)).isNotNull(); + } + + @Test + public void serializableTableConditionalMetadataSerialization() + throws IOException, ClassNotFoundException { + // Create a table with data through REST catalog + Schema schema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get())); + + TableIdentifier ident = TableIdentifier.of(Namespace.of("ns"), "conditional_test"); + restCatalog.createNamespace(ident.namespace()); + Table table = restCatalog.createTable(ident, schema); + + // Add data with multiple snapshots + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + + // Case 1: Normal SerializableTable (with metadata location from REST catalog) + SerializableTable serializableWithLocation = + (SerializableTable) SerializableTable.copyOf(table); + + // Test with Java serialization + Table javaDeserialized = TestHelpers.roundTripSerialize(serializableWithLocation); + verifyConditionalSerialization(javaDeserialized, table); + + // Test with Kryo serialization + Table kryoDeserialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableWithLocation); + verifyConditionalSerialization(kryoDeserialized, table); + } + + private void verifyConditionalSerialization(Table deserialized, Table original) { + // Verify it works + assertThat(deserialized.currentSnapshot()).isNotNull(); + assertThat(deserialized.currentSnapshot().snapshotId()) + .isEqualTo(original.currentSnapshot().snapshotId()); + assertThat(deserialized.snapshots()).isNotNull().hasSameSizeAs(original.snapshots()); + + // Test the optimization - when table requires remote scan planning, + // tableMetadata is always serialized (enabling distributed query planning) + // For other tables, tableMetadata is not serialized (saving space) + // This test verifies the SerializableTable works correctly for both cases + assertThat(deserialized.schema().asStruct()).isEqualTo(original.schema().asStruct()); + assertThat(deserialized.location()).isEqualTo(original.location()); + } }