diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 936706aa8f93..12adbd6d886e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -47,6 +47,7 @@ import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -80,6 +81,12 @@ enum TransactionType { private boolean hasLastOpCommitted; private final MetricsReporter reporter; + private Schema replaceSchema; + private PartitionSpec replaceSpec; + private SortOrder replaceSortOrder; + private String replaceLocation; + private Map replaceProperties; + BaseTransaction( String tableName, TableOperations ops, TransactionType type, TableMetadata start) { this(tableName, ops, type, start, LoggingMetricsReporter.instance()); @@ -101,6 +108,17 @@ enum TransactionType { this.type = type; this.hasLastOpCommitted = true; this.reporter = reporter; + + // For replace-style transactions, the provided TableMetadata contains the information needed to + // build the replaced table state. This is stored so the replacement can be re-applied on top of + // refreshed metadata on commit. + if (type == TransactionType.REPLACE_TABLE || type == TransactionType.CREATE_OR_REPLACE_TABLE) { + this.replaceSchema = start.schema(); + this.replaceSpec = start.spec(); + this.replaceSortOrder = start.sortOrder(); + this.replaceLocation = start.location(); + this.replaceProperties = ImmutableMap.copyOf(start.properties()); + } } @Override @@ -260,12 +278,8 @@ public void commitTransaction() { commitCreateTransaction(); break; - case REPLACE_TABLE: - commitReplaceTransaction(false); - break; - - case CREATE_OR_REPLACE_TABLE: - commitReplaceTransaction(true); + case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE: + commitReplaceTransaction(); break; case SIMPLE: @@ -298,9 +312,13 @@ private void commitCreateTransaction() { } } - private void commitReplaceTransaction(boolean orCreate) { + private void commitReplaceTransaction() { Map props = base != null ? base.properties() : current.properties(); + Set startingSnapshots = + base != null + ? base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()) + : ImmutableSet.of(); try { Tasks.foreach(ops) .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) @@ -315,19 +333,7 @@ private void commitReplaceTransaction(boolean orCreate) { .onlyRetryOn(CommitFailedException.class) .run( underlyingOps -> { - try { - underlyingOps.refresh(); - } catch (NoSuchTableException e) { - if (!orCreate) { - throw e; - } - } - - // because this is a replace table, it will always completely replace the table - // metadata. even if it was just updated. - if (base != underlyingOps.current()) { - this.base = underlyingOps.current(); // just refreshed - } + applyUpdates(underlyingOps); underlyingOps.commit(base, current); }); @@ -335,20 +341,19 @@ private void commitReplaceTransaction(boolean orCreate) { } catch (CommitStateUnknownException e) { throw e; + } catch (PendingUpdateFailedException e) { + cleanUpOnCommitFailure(); + throw e.wrapped(); + } catch (RuntimeException e) { - // the commit failed and no files were committed. clean up each update. if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) { - cleanAllUpdates(); + cleanUpOnCommitFailure(); } throw e; - - } finally { - // replace table never needs to retry because the table state is completely replaced. because - // retries are not - // a concern, it is safe to delete all the deleted files from individual operations - deleteUncommittedFiles(deletedFiles); } + + cleanUpAfterCommitSuccess(startingSnapshots); } private void commitSimpleTransaction() { @@ -381,6 +386,7 @@ private void commitSimpleTransaction() { } catch (PendingUpdateFailedException e) { cleanUpOnCommitFailure(); throw e.wrapped(); + } catch (RuntimeException e) { if (!ops.requireStrictCleanup() || e instanceof CleanableFailure) { cleanUpOnCommitFailure(); @@ -389,8 +395,18 @@ private void commitSimpleTransaction() { throw e; } - // the commit succeeded + cleanUpAfterCommitSuccess(startingSnapshots); + } + + private void cleanUpOnCommitFailure() { + // the commit failed and no files were committed. clean up each update. + cleanAllUpdates(); + + // delete all the uncommitted files + deleteUncommittedFiles(deletedFiles); + } + private void cleanUpAfterCommitSuccess(Set startingSnapshots) { try { // clean up the data files that were deleted by each operation. first, get the list of // committed manifests to ensure that no committed manifest is deleted. @@ -420,14 +436,6 @@ private void commitSimpleTransaction() { } } - private void cleanUpOnCommitFailure() { - // the commit failed and no files were committed. clean up each update. - cleanAllUpdates(); - - // delete all the uncommitted files - deleteUncommittedFiles(deletedFiles); - } - private void cleanAllUpdates() { Tasks.foreach(updates) .suppressFailureWhenFinished() @@ -459,10 +467,40 @@ private void deleteUncommittedFiles(Iterable paths) { } private void applyUpdates(TableOperations underlyingOps) { - if (base != underlyingOps.refresh()) { - // use refreshed the metadata + try { + underlyingOps.refresh(); + } catch (NoSuchTableException e) { + if (type == TransactionType.CREATE_OR_REPLACE_TABLE) { + return; + } + throw e; + } + + if (base != underlyingOps.current()) { + // use the refreshed metadata this.base = underlyingOps.current(); - this.current = underlyingOps.current(); + + this.current = + switch (type) { + case REPLACE_TABLE, CREATE_OR_REPLACE_TABLE -> + // Even if we are dealing with a replace-style transaction, we need to re-apply + // updates on top of the refreshed metadata's replacement, because of (1) possible + // row lineage requirements, and (2) to not overwrite the metadata with an outdated + // replacement that may cause history loss or table corruption. + underlyingOps + .current() + .buildReplacement( + replaceSchema, + replaceSpec, + replaceSortOrder, + replaceLocation, + replaceProperties); + case SIMPLE -> underlyingOps.current(); + case CREATE_TABLE -> + throw new IllegalStateException( + "Transaction update application not expected for create transactions"); + }; + for (PendingUpdate update : updates) { // re-commit each update in the chain to apply it and update current try { diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index d2a6ab618ca8..cf6dee85788d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -59,7 +59,6 @@ enum UpdateType { private final Supplier> mutationHeaders; private final FileIO io; private final List createChanges; - private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; @@ -132,7 +131,6 @@ enum UpdateType { this.io = io; this.updateType = updateType; this.createChanges = createChanges; - this.replaceBase = current; if (updateType == UpdateType.CREATE) { this.current = null; } else { @@ -179,8 +177,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { .addAll(createChanges) .addAll(metadata.changes()) .build(); - // use the original replace base metadata because the transaction will refresh - requirements = UpdateRequirements.forReplaceTable(replaceBase, updates); + requirements = UpdateRequirements.forReplaceTable(base, updates); errorHandler = ErrorHandlers.tableCommitHandler(); break; diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 833b2fb0b46f..7f0fd467bbae 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2655,15 +2655,20 @@ public void testReplaceTableKeepsSnapshotLog() { .containsExactly(snapshotBeforeReplace, snapshotAfterReplace); } - @Test - public void testConcurrentReplaceTransactions() { + @ParameterizedTest + @ValueSource(ints = {2, 3}) + public void testConcurrentReplaceTransactions(int formatVersion) { C catalog = catalog(); if (requiresNamespaceCreate()) { catalog.createNamespace(NS); } - Transaction transaction = catalog.buildTable(TABLE, SCHEMA).createTransaction(); + Transaction transaction = + catalog + .buildTable(TABLE, SCHEMA) + .withProperty("format-version", String.valueOf(formatVersion)) + .createTransaction(); transaction.newFastAppend().appendFile(FILE_A).commit(); transaction.commitTransaction(); @@ -2691,6 +2696,8 @@ public void testConcurrentReplaceTransactions() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); + // All three successfully committed snapshots should be present + assertThat(afterSecondReplace.snapshots()).hasSize(3); assertThat(afterSecondReplace.schema().asStruct()) .as("Table schema should match the original schema") .isEqualTo(original.schema().asStruct()); @@ -2734,9 +2741,15 @@ public void testConcurrentReplaceTransactionSchema() { secondReplace.commitTransaction(); Table afterSecondReplace = catalog.loadTable(TABLE); + + // The second replace will be rebased on the first replace, so new field IDs will be assigned assertThat(afterSecondReplace.schema().asStruct()) - .as("Table schema should match the original schema") - .isEqualTo(original.schema().asStruct()); + .as("Table schema should differ from the original schema") + .isNotEqualTo(original.schema().asStruct()); + assertThat(afterSecondReplace.schema().select("name", "type").asStruct()) + .as("Table schema should match the original schema in names and types") + .isEqualTo(original.schema().select("name", "type").asStruct()); + assertUUIDsMatch(original, afterSecondReplace); assertFiles(afterSecondReplace, FILE_C); } @@ -2782,8 +2795,6 @@ public void testConcurrentReplaceTransactionSchema2() { @Test public void testConcurrentReplaceTransactionSchemaConflict() { - assumeThat(supportsServerSideRetry()).as("Schema conflicts are detected server-side").isTrue(); - C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -2812,12 +2823,11 @@ public void testConcurrentReplaceTransactionSchemaConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new schema is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned field id changed"); + secondReplace.commitTransaction(); + Table afterSecondReplace = catalog.loadTable(TABLE); + assertThat(afterSecondReplace.schema().asStruct()) + .as("Table schema should match the new schema") + .isEqualTo(REPLACE_SCHEMA.asStruct()); } @Test @@ -2902,7 +2912,6 @@ public void testConcurrentReplaceTransactionPartitionSpec2() { @Test public void testConcurrentReplaceTransactionPartitionSpecConflict() { - assumeThat(supportsServerSideRetry()).as("Spec conflicts are detected server-side").isTrue(); C catalog = catalog(); if (requiresNamespaceCreate()) { @@ -2932,12 +2941,10 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new spec is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned partition id changed"); + secondReplace.commitTransaction(); + assertThat(catalog.loadTable(TABLE).spec().fields()) + .as("Table spec should match the new spec") + .isEqualTo(TABLE_SPEC.fields()); } @Test