Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 78 additions & 40 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -80,6 +81,12 @@ enum TransactionType {
private boolean hasLastOpCommitted;
private final MetricsReporter reporter;

private Schema replaceSchema;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: This change turned out to be more breaking than I expected. If we want to proceed, see if this can be cleaned up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Realise this is the sort of change that'd require a dev list discussion - I wanted to experiment with this approach first)

private PartitionSpec replaceSpec;
private SortOrder replaceSortOrder;
private String replaceLocation;
private Map<String, String> replaceProperties;

BaseTransaction(
String tableName, TableOperations ops, TransactionType type, TableMetadata start) {
this(tableName, ops, type, start, LoggingMetricsReporter.instance());
Expand All @@ -101,6 +108,17 @@ enum TransactionType {
this.type = type;
this.hasLastOpCommitted = true;
this.reporter = reporter;

// For replace-style transactions, the provided TableMetadata contains the information needed to
// build the replaced table state. This is stored so the replacement can be re-applied on top of
// refreshed metadata on commit.
if (type == TransactionType.REPLACE_TABLE || type == TransactionType.CREATE_OR_REPLACE_TABLE) {
this.replaceSchema = start.schema();
this.replaceSpec = start.spec();
this.replaceSortOrder = start.sortOrder();
this.replaceLocation = start.location();
this.replaceProperties = ImmutableMap.copyOf(start.properties());
}
}

@Override
Expand Down Expand Up @@ -260,12 +278,8 @@ public void commitTransaction() {
commitCreateTransaction();
break;

case REPLACE_TABLE:
commitReplaceTransaction(false);
break;

case CREATE_OR_REPLACE_TABLE:
commitReplaceTransaction(true);
case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE:
commitReplaceTransaction();
break;

case SIMPLE:
Expand Down Expand Up @@ -298,9 +312,13 @@ private void commitCreateTransaction() {
}
}

private void commitReplaceTransaction(boolean orCreate) {
private void commitReplaceTransaction() {
Map<String, String> props = base != null ? base.properties() : current.properties();

Set<Long> startingSnapshots =
base != null
? base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet())
: ImmutableSet.of();
try {
Tasks.foreach(ops)
.retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
Expand All @@ -315,40 +333,27 @@ private void commitReplaceTransaction(boolean orCreate) {
.onlyRetryOn(CommitFailedException.class)
.run(
underlyingOps -> {
try {
underlyingOps.refresh();
} catch (NoSuchTableException e) {
if (!orCreate) {
throw e;
}
}

// because this is a replace table, it will always completely replace the table
// metadata. even if it was just updated.
if (base != underlyingOps.current()) {
this.base = underlyingOps.current(); // just refreshed
}
applyUpdates(underlyingOps);

underlyingOps.commit(base, current);
});

} catch (CommitStateUnknownException e) {
throw e;

} catch (PendingUpdateFailedException e) {
cleanUpOnCommitFailure();
throw e.wrapped();

} catch (RuntimeException e) {
// the commit failed and no files were committed. clean up each update.
if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
cleanAllUpdates();
cleanUpOnCommitFailure();
}

throw e;

} finally {
// replace table never needs to retry because the table state is completely replaced. because
// retries are not
// a concern, it is safe to delete all the deleted files from individual operations
deleteUncommittedFiles(deletedFiles);
}

cleanUpAfterCommitSuccess(startingSnapshots);
}

private void commitSimpleTransaction() {
Expand Down Expand Up @@ -381,6 +386,7 @@ private void commitSimpleTransaction() {
} catch (PendingUpdateFailedException e) {
cleanUpOnCommitFailure();
throw e.wrapped();

} catch (RuntimeException e) {
if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) {
cleanUpOnCommitFailure();
Expand All @@ -389,8 +395,18 @@ private void commitSimpleTransaction() {
throw e;
}

// the commit succeeded
cleanUpAfterCommitSuccess(startingSnapshots);
}

private void cleanUpOnCommitFailure() {
// the commit failed and no files were committed. clean up each update.
cleanAllUpdates();

// delete all the uncommitted files
deleteUncommittedFiles(deletedFiles);
}

private void cleanUpAfterCommitSuccess(Set<Long> startingSnapshots) {
try {
// clean up the data files that were deleted by each operation. first, get the list of
// committed manifests to ensure that no committed manifest is deleted.
Expand Down Expand Up @@ -420,14 +436,6 @@ private void commitSimpleTransaction() {
}
}

private void cleanUpOnCommitFailure() {
// the commit failed and no files were committed. clean up each update.
cleanAllUpdates();

// delete all the uncommitted files
deleteUncommittedFiles(deletedFiles);
}

private void cleanAllUpdates() {
Tasks.foreach(updates)
.suppressFailureWhenFinished()
Expand Down Expand Up @@ -459,10 +467,40 @@ private void deleteUncommittedFiles(Iterable<String> paths) {
}

private void applyUpdates(TableOperations underlyingOps) {
if (base != underlyingOps.refresh()) {
// use refreshed the metadata
try {
underlyingOps.refresh();
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: TestHiveCreateReplaceTable#testCreateOrReplaceTableTxnTableDeletedConcurrently shows an NPE where this refresh actually returns null instead of NoSuchTableException being thrown. Consider handling that here, fixing if it's a bug, or leaving for now (as that's maybe how concurrent appends with dropped failed prior to this PR)

} catch (NoSuchTableException e) {
if (type == TransactionType.CREATE_OR_REPLACE_TABLE) {
return;
}
throw e;
}

if (base != underlyingOps.current()) {
// use the refreshed metadata
this.base = underlyingOps.current();
this.current = underlyingOps.current();

this.current =
switch (type) {
case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE ->
// Even if we are dealing with a replace-style transaction, we need to re-apply
// updates on top of the refreshed metadata's replacement, because of (1) possible
// row lineage requirements, and (2) to not overwrite the metadata with an outdated
// replacement that may cause history loss or table corruption.
underlyingOps
.current()
.buildReplacement(
replaceSchema,
replaceSpec,
replaceSortOrder,
replaceLocation,
replaceProperties);
case SIMPLE -> underlyingOps.current();
case CREATE_TABLE ->
throw new IllegalStateException(
"Transaction update application not expected for create transactions");
};

for (PendingUpdate update : updates) {
// re-commit each update in the chain to apply it and update current
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ enum UpdateType {
private final Supplier<Map<String, String>> mutationHeaders;
private final FileIO io;
private final List<MetadataUpdate> createChanges;
private final TableMetadata replaceBase;
private final Set<Endpoint> endpoints;
private UpdateType updateType;
private TableMetadata current;
Expand Down Expand Up @@ -132,7 +131,6 @@ enum UpdateType {
this.io = io;
this.updateType = updateType;
this.createChanges = createChanges;
this.replaceBase = current;
if (updateType == UpdateType.CREATE) {
this.current = null;
} else {
Expand Down Expand Up @@ -179,8 +177,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
.addAll(createChanges)
.addAll(metadata.changes())
.build();
// use the original replace base metadata because the transaction will refresh
requirements = UpdateRequirements.forReplaceTable(replaceBase, updates);
requirements = UpdateRequirements.forReplaceTable(base, updates);
errorHandler = ErrorHandlers.tableCommitHandler();
break;

Expand Down
47 changes: 27 additions & 20 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2655,15 +2655,20 @@ public void testReplaceTableKeepsSnapshotLog() {
.containsExactly(snapshotBeforeReplace, snapshotAfterReplace);
}

@Test
public void testConcurrentReplaceTransactions() {
@ParameterizedTest
@ValueSource(ints = {2, 3})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#15091 shows the failure of this V3 test, prior to this PR

public void testConcurrentReplaceTransactions(int formatVersion) {
C catalog = catalog();

if (requiresNamespaceCreate()) {
catalog.createNamespace(NS);
}

Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction();
Transaction transaction =
catalog
.buildTable(TABLE, SCHEMA)
.withProperty("format-version", String.valueOf(formatVersion))
.createTransaction();
transaction.newFastAppend().appendFile(FILE_A).commit();
transaction.commitTransaction();

Expand Down Expand Up @@ -2691,6 +2696,8 @@ public void testConcurrentReplaceTransactions() {
secondReplace.commitTransaction();

Table afterSecondReplace = catalog.loadTable(TABLE);
// All three successfully committed snapshots should be present
assertThat(afterSecondReplace.snapshots()).hasSize(3);
Comment on lines +2699 to +2700
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#15090 shows the failure of this added line, prior to this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just add a new test please to show where exactly stuff fails with V3?

Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - I've updated the PR description in the meantime to cover this.
(Any concurrent change to a table's snapshots causes the replace transaction to fail entirely for the REST catalog, due to server-side row-lineage validation. I'll put up an issue to track, actually)

assertThat(afterSecondReplace.schema().asStruct())
.as("Table schema should match the original schema")
.isEqualTo(original.schema().asStruct());
Expand Down Expand Up @@ -2734,9 +2741,15 @@ public void testConcurrentReplaceTransactionSchema() {
secondReplace.commitTransaction();

Table afterSecondReplace = catalog.loadTable(TABLE);

// The second replace will be rebased on the first replace, so new field IDs will be assigned
assertThat(afterSecondReplace.schema().asStruct())
.as("Table schema should match the original schema")
.isEqualTo(original.schema().asStruct());
.as("Table schema should differ from the original schema")
.isNotEqualTo(original.schema().asStruct());
assertThat(afterSecondReplace.schema().select("name", "type").asStruct())
.as("Table schema should match the original schema in names and types")
.isEqualTo(original.schema().select("name", "type").asStruct());

assertUUIDsMatch(original, afterSecondReplace);
assertFiles(afterSecondReplace, FILE_C);
}
Expand Down Expand Up @@ -2782,8 +2795,6 @@ public void testConcurrentReplaceTransactionSchema2() {

@Test
public void testConcurrentReplaceTransactionSchemaConflict() {
assumeThat(supportsServerSideRetry()).as("Schema conflicts are detected server-side").isTrue();

C catalog = catalog();

if (requiresNamespaceCreate()) {
Expand Down Expand Up @@ -2812,12 +2823,11 @@ public void testConcurrentReplaceTransactionSchemaConflict() {
assertUUIDsMatch(original, afterFirstReplace);
assertFiles(afterFirstReplace, FILE_B);

// even though the new schema is identical, the assertion that the last assigned id has not
// changed will fail
assertThatThrownBy(secondReplace::commitTransaction)
.isInstanceOf(CommitFailedException.class)
.hasMessageStartingWith(
"Commit failed: Requirement failed: last assigned field id changed");
secondReplace.commitTransaction();
Table afterSecondReplace = catalog.loadTable(TABLE);
assertThat(afterSecondReplace.schema().asStruct())
.as("Table schema should match the new schema")
.isEqualTo(REPLACE_SCHEMA.asStruct());
}

@Test
Expand Down Expand Up @@ -2902,7 +2912,6 @@ public void testConcurrentReplaceTransactionPartitionSpec2() {

@Test
public void testConcurrentReplaceTransactionPartitionSpecConflict() {
assumeThat(supportsServerSideRetry()).as("Spec conflicts are detected server-side").isTrue();
C catalog = catalog();

if (requiresNamespaceCreate()) {
Expand Down Expand Up @@ -2932,12 +2941,10 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() {
assertUUIDsMatch(original, afterFirstReplace);
assertFiles(afterFirstReplace, FILE_B);

// even though the new spec is identical, the assertion that the last assigned id has not
// changed will fail
assertThatThrownBy(secondReplace::commitTransaction)
.isInstanceOf(CommitFailedException.class)
.hasMessageStartingWith(
"Commit failed: Requirement failed: last assigned partition id changed");
secondReplace.commitTransaction();
assertThat(catalog.loadTable(TABLE).spec().fields())
.as("Table spec should match the new spec")
.isEqualTo(TABLE_SPEC.fields());
}

@Test
Expand Down
Loading