Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class SerializableTable implements Table, HasTableOperations, Serializabl
private final UUID uuid;
private final int formatVersion;
private final Try<LocationProvider> locationProviderTry;
private final TableMetadata tableMetadata;

private transient volatile Table lazyTable = null;
private transient volatile Schema lazySchema = null;
Expand All @@ -89,6 +90,13 @@ protected SerializableTable(Table table) {
this.refs = SerializableMap.copyOf(table.refs());
this.uuid = table.uuid();
this.formatVersion = formatVersion(table);

// Only store full TableMetadata if the table requires remote scan planning.
if (table instanceof RequiresRemoteScanPlanning) {
this.tableMetadata = tableMetadata(table);
} else {
this.tableMetadata = null;
}
}

/**
Expand Down Expand Up @@ -136,13 +144,20 @@ private Table lazyTable() {
if (lazyTable == null) {
synchronized (this) {
if (lazyTable == null) {
if (metadataFileLocation == null) {
TableOperations ops;

if (tableMetadata != null) {
// Use serialized TableMetadata (tables requiring remote scan planning)
// This avoids storage access for distributed query planning
ops = new StaticTableOperations(tableMetadata, io, locationProvider());
} else if (metadataFileLocation != null) {
// Read from storage using metadata location
ops = new StaticTableOperations(metadataFileLocation, io, locationProvider());
} else {
throw new UnsupportedOperationException(
"Cannot load metadata: metadata file location is null");
"Cannot load metadata: both table metadata and metadata file location are null");
}

TableOperations ops =
new StaticTableOperations(metadataFileLocation, io, locationProvider());
this.lazyTable = newTable(ops, name);
}
}
Expand Down Expand Up @@ -186,6 +201,13 @@ private int formatVersion(Table table) {
}
}

private TableMetadata tableMetadata(Table table) {
if (table instanceof HasTableOperations) {
return ((HasTableOperations) table).operations().current();
}
return null;
}

@Override
public Schema schema() {
if (lazySchema == null) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public String toString() {
}
}

public static class MetadataLogEntry {
public static class MetadataLogEntry implements Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should fix this irrespective, IMHO

private final long timestampMillis;
private final String file;

Expand Down
173 changes: 173 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
Expand Down Expand Up @@ -3528,4 +3530,175 @@ private static List<HTTPRequest> allRequests(RESTCatalogAdapter adapter) {
verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any());
return captor.getAllValues();
}

@Test
public void serializableTable() throws IOException, ClassNotFoundException {
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()));
restCatalog.createNamespace(TABLE.namespace());
Table table = restCatalog.createTable(TABLE, schema);

// Add data files to create snapshots
table.newAppend().appendFile(FILE_A).commit();
table.newAppend().appendFile(FILE_B).commit();

// Create SerializableTable from REST catalog
SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);

// Test with Java serialization
Table javaSerialized = TestHelpers.roundTripSerialize(serializableTable);
verifySerializedTable(javaSerialized, table);

// Test with Kryo serialization
Table kryoSerialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableTable);
verifySerializedTable(kryoSerialized, table);
}

private void verifySerializedTable(Table deserialized, Table original) {
// Verify that basic operations work using serialized table metadata
// (tables requiring remote scan planning serialize full metadata)
assertThat(deserialized.schema().asStruct()).isEqualTo(original.schema().asStruct());
assertThat(deserialized.spec().specId()).isEqualTo(original.spec().specId());
assertThat(deserialized.location()).isEqualTo(original.location());
assertThat(deserialized.properties()).containsAllEntriesOf(original.properties());

// Verify snapshot operations work using serialized metadata
assertThat(deserialized.currentSnapshot()).isNotNull();
assertThat(deserialized.currentSnapshot().snapshotId())
.isEqualTo(original.currentSnapshot().snapshotId());

// Verify snapshots are accessible
assertThat(deserialized.snapshots()).isNotNull().hasSameSizeAs(original.snapshots());

// Verify scan operations work
assertThat(deserialized.newScan()).isNotNull();
}

@Test
public void serializableTableWithSchemaEvolution() throws IOException, ClassNotFoundException {
// Create initial table
Schema initialSchema =
new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()));

TableIdentifier ident = TableIdentifier.of(Namespace.of("ns"), "schema_evolution_table");
restCatalog.createNamespace(ident.namespace());
Table table = restCatalog.createTable(ident, initialSchema);

// Evolve schema
table.updateSchema().addColumn("new_col1", Types.IntegerType.get()).commit();
table.newAppend().appendFile(FILE_A).commit();

// Evolve schema again
table.updateSchema().addColumn("new_col2", Types.StringType.get()).commit();
table.newAppend().appendFile(FILE_B).commit();

// Create and serialize table
SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);

// Test with Java serialization
Table javaSerialized = TestHelpers.roundTripSerialize(serializableTable);
verifySchemaEvolution(javaSerialized, table);

// Test with Kryo serialization
Table kryoSerialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableTable);
verifySchemaEvolution(kryoSerialized, table);
}

private void verifySchemaEvolution(Table deserialized, Table original) {
// Verify current schema
assertThat(deserialized.schema().columns()).hasSameSizeAs(original.schema().columns());

// Verify all historical schemas are preserved
assertThat(deserialized.schemas()).isNotNull().hasSameSizeAs(original.schemas());

// Verify scans work
assertThat(deserialized.newScan()).isNotNull();
}

@Test
public void serializableTableSnapshotRefs() throws IOException, ClassNotFoundException {
// Create table with data
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()));

TableIdentifier ident = TableIdentifier.of(Namespace.of("ns"), "refs_table");
restCatalog.createNamespace(ident.namespace());
Table table = restCatalog.createTable(ident, schema);

// Create first snapshot and tag it
table.newAppend().appendFile(FILE_A).commit();
long taggedSnapshotId = table.currentSnapshot().snapshotId();
table
.manageSnapshots()
.createTag("production", taggedSnapshotId)
.setMaxRefAgeMs("production", Long.MAX_VALUE)
.commit();

// Add more data
table.newAppend().appendFile(FILE_B).commit();

// Serialize and deserialize
SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);

// Test with Java serialization
Table javaSerialized = TestHelpers.roundTripSerialize(serializableTable);
assertThat(javaSerialized.refs()).containsEntry("production", table.refs().get("production"));
assertThat(javaSerialized.snapshot(taggedSnapshotId)).isNotNull();

// Test with Kryo serialization
Table kryoSerialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableTable);
assertThat(kryoSerialized.refs()).containsEntry("production", table.refs().get("production"));
assertThat(kryoSerialized.snapshot(taggedSnapshotId)).isNotNull();
}

@Test
public void serializableTableConditionalMetadataSerialization()
throws IOException, ClassNotFoundException {
// Create a table with data through REST catalog
Schema schema =
new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()));

TableIdentifier ident = TableIdentifier.of(Namespace.of("ns"), "conditional_test");
restCatalog.createNamespace(ident.namespace());
Table table = restCatalog.createTable(ident, schema);

// Add data with multiple snapshots
table.newAppend().appendFile(FILE_A).commit();
table.newAppend().appendFile(FILE_B).commit();

// Case 1: Normal SerializableTable (with metadata location from REST catalog)
SerializableTable serializableWithLocation =
(SerializableTable) SerializableTable.copyOf(table);

// Test with Java serialization
Table javaDeserialized = TestHelpers.roundTripSerialize(serializableWithLocation);
verifyConditionalSerialization(javaDeserialized, table);

// Test with Kryo serialization
Table kryoDeserialized = TestHelpers.KryoHelpers.roundTripSerialize(serializableWithLocation);
verifyConditionalSerialization(kryoDeserialized, table);
}

private void verifyConditionalSerialization(Table deserialized, Table original) {
// Verify it works
assertThat(deserialized.currentSnapshot()).isNotNull();
assertThat(deserialized.currentSnapshot().snapshotId())
.isEqualTo(original.currentSnapshot().snapshotId());
assertThat(deserialized.snapshots()).isNotNull().hasSameSizeAs(original.snapshots());

// Test the optimization - when table requires remote scan planning,
// tableMetadata is always serialized (enabling distributed query planning)
// For other tables, tableMetadata is not serialized (saving space)
// This test verifies the SerializableTable works correctly for both cases
assertThat(deserialized.schema().asStruct()).isEqualTo(original.schema().asStruct());
assertThat(deserialized.location()).isEqualTo(original.location());
}
}