diff --git a/core/src/main/java/org/apache/iceberg/CachingCatalog.java b/core/src/main/java/org/apache/iceberg/CachingCatalog.java index 913f1a9482e1..8034d89e5790 100644 --- a/core/src/main/java/org/apache/iceberg/CachingCatalog.java +++ b/core/src/main/java/org/apache/iceberg/CachingCatalog.java @@ -32,6 +32,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.slf4j.Logger; @@ -52,12 +53,34 @@ public static Catalog wrap(Catalog catalog) { } public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) { - return wrap(catalog, true, expirationIntervalMillis); + return wrap( + catalog, + true, + expirationIntervalMillis, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + } + + public static Catalog wrap( + Catalog catalog, long expirationIntervalMillis, long expireAfterWriteIntervalMillis) { + return wrap(catalog, true, expirationIntervalMillis, expireAfterWriteIntervalMillis); } public static Catalog wrap( Catalog catalog, boolean caseSensitive, long expirationIntervalMillis) { - return new CachingCatalog(catalog, caseSensitive, expirationIntervalMillis); + return wrap( + catalog, + caseSensitive, + expirationIntervalMillis, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + } + + public static Catalog wrap( + Catalog catalog, + boolean caseSensitive, + long expirationIntervalMillis, + long expireAfterWriteIntervalMillis) { + return new CachingCatalog( + catalog, caseSensitive, expirationIntervalMillis, expireAfterWriteIntervalMillis); } private final Catalog catalog; @@ -66,23 +89,61 @@ public static Catalog wrap( @SuppressWarnings("checkstyle:VisibilityModifier") protected final long expirationIntervalMillis; + @SuppressWarnings("checkstyle:VisibilityModifier") + protected long expireAfterWriteIntervalMillis; + @SuppressWarnings("checkstyle:VisibilityModifier") protected final Cache tableCache; - private CachingCatalog(Catalog catalog, boolean caseSensitive, long expirationIntervalMillis) { - this(catalog, caseSensitive, expirationIntervalMillis, Ticker.systemTicker()); + private CachingCatalog( + Catalog catalog, + boolean caseSensitive, + long expirationIntervalMillis, + long expireAfterWriteIntervalMillis) { + this( + catalog, + caseSensitive, + expirationIntervalMillis, + Ticker.systemTicker(), + expireAfterWriteIntervalMillis); } + /** + * Caching Catalog + * + * @deprecated will be removed in 1.12.0; use {@link #CachingCatalog(Catalog, boolean, long, + * Ticker, long)} instead. + */ + @Deprecated + @VisibleForTesting @SuppressWarnings("checkstyle:VisibilityModifier") protected CachingCatalog( Catalog catalog, boolean caseSensitive, long expirationIntervalMillis, Ticker ticker) { + this( + catalog, + caseSensitive, + expirationIntervalMillis, + ticker, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + } + + @VisibleForTesting + CachingCatalog( + Catalog catalog, + boolean caseSensitive, + long expirationIntervalMillis, + Ticker ticker, + long expireAfterWriteIntervalMillis) { Preconditions.checkArgument( - expirationIntervalMillis != 0, - "When %s is set to 0, the catalog cache should be disabled. This indicates a bug.", - CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS); + expirationIntervalMillis != 0 || expireAfterWriteIntervalMillis != 0, + "When both %s and %s are set to 0, the catalog cache should be disabled. This indicates a bug.", + CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS); + this.catalog = catalog; this.caseSensitive = caseSensitive; this.expirationIntervalMillis = expirationIntervalMillis; + this.expireAfterWriteIntervalMillis = expireAfterWriteIntervalMillis; this.tableCache = createTableCache(ticker); } @@ -104,15 +165,25 @@ public void onRemoval(TableIdentifier tableIdentifier, Table table, RemovalCause } private Cache createTableCache(Ticker ticker) { - Caffeine cacheBuilder = Caffeine.newBuilder().softValues(); + + if (expirationIntervalMillis <= 0 && expireAfterWriteIntervalMillis <= 0) { + return Caffeine.newBuilder().softValues().build(); + } + + Caffeine cacheBuilder = + Caffeine.newBuilder() + .softValues() + .removalListener(new MetadataTableInvalidatingRemovalListener()) + .executor(Runnable::run) // Makes the callbacks to removal listener synchronous + .ticker(ticker); if (expirationIntervalMillis > 0) { - return cacheBuilder - .removalListener(new MetadataTableInvalidatingRemovalListener()) - .executor(Runnable::run) // Makes the callbacks to removal listener synchronous - .expireAfterAccess(Duration.ofMillis(expirationIntervalMillis)) - .ticker(ticker) - .build(); + cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofMillis(expirationIntervalMillis)); + } + + if (expireAfterWriteIntervalMillis > 0) { + cacheBuilder = + cacheBuilder.expireAfterWrite(Duration.ofMillis(expireAfterWriteIntervalMillis)); } return cacheBuilder.build(); diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index f35c90c4e80c..df3191a0ecd0 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -56,8 +56,8 @@ private CatalogProperties() {} *
    *
  • Zero - Caching and cache expiration are both disabled *
  • Negative Values - Cache expiration is turned off and entries expire only on refresh etc - *
  • Positive Values - Cache entries expire if not accessed via the cache after this many - * milliseconds + *
  • Positive Values - Cache entries expire this milliseconds based on the cache expiration + * policy *
*/ public static final String CACHE_EXPIRATION_INTERVAL_MS = "cache.expiration-interval-ms"; @@ -65,6 +65,10 @@ private CatalogProperties() {} public static final long CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(30); public static final long CACHE_EXPIRATION_INTERVAL_MS_OFF = -1; + public static final String CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS = + "cache.expiration.expire-after-write-interval-ms"; + public static final long CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT = 0; + /** * Controls whether to use caching during manifest reads or not. * diff --git a/core/src/test/java/org/apache/iceberg/TestableCachingCatalog.java b/core/src/test/java/org/apache/iceberg/TestableCachingCatalog.java index db3d526e8d6d..38491ad34de4 100644 --- a/core/src/test/java/org/apache/iceberg/TestableCachingCatalog.java +++ b/core/src/test/java/org/apache/iceberg/TestableCachingCatalog.java @@ -34,15 +34,40 @@ public class TestableCachingCatalog extends CachingCatalog { public static TestableCachingCatalog wrap( Catalog catalog, Duration expirationInterval, Ticker ticker) { return new TestableCachingCatalog( - catalog, true /* caseSensitive */, expirationInterval, ticker); + catalog, + true /* caseSensitive */, + expirationInterval, + ticker, + Duration.ofMillis( + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT)); + } + + public static TestableCachingCatalog wrap( + Catalog catalog, + Duration expirationInterval, + Ticker ticker, + Duration expireAfterWriteInterval) { + return new TestableCachingCatalog( + catalog, true /* caseSensitive */, expirationInterval, ticker, expireAfterWriteInterval); } private final Duration cacheExpirationInterval; + private final Duration expireAfterWriteInterval; TestableCachingCatalog( - Catalog catalog, boolean caseSensitive, Duration expirationInterval, Ticker ticker) { - super(catalog, caseSensitive, expirationInterval.toMillis(), ticker); + Catalog catalog, + boolean caseSensitive, + Duration expirationInterval, + Ticker ticker, + Duration expireAfterWriteInterval) { + super( + catalog, + caseSensitive, + expirationInterval.toMillis(), + ticker, + expireAfterWriteInterval.toMillis()); this.cacheExpirationInterval = expirationInterval; + this.expireAfterWriteInterval = expireAfterWriteInterval; } public Cache cache() { @@ -58,12 +83,10 @@ public boolean isCacheExpirationEnabled() { || tableCache.policy().expireAfterWrite().isPresent(); } - // Throws a NoSuchElementException if this entry is not in the cache (has already been TTL'd). public Optional ageOf(TableIdentifier identifier) { return tableCache.policy().expireAfterAccess().get().ageOf(identifier); } - // Throws a NoSuchElementException if the entry is not in the cache (has already been TTL'd). public Optional remainingAgeFor(TableIdentifier identifier) { return ageOf(identifier).map(cacheExpirationInterval::minus); } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCachingCatalog.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCachingCatalog.java index 4c711c118772..97b6195bb0ce 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCachingCatalog.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCachingCatalog.java @@ -375,7 +375,7 @@ public void testCachingCatalogRejectsExpirationIntervalOfZero() { assertThatThrownBy(() -> TestableCachingCatalog.wrap(hadoopCatalog(), Duration.ZERO, ticker)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "When cache.expiration-interval-ms is set to 0, the catalog cache should be disabled. This indicates a bug."); + "When both cache.expiration-interval-ms and cache.expiration.expire-after-write-interval-ms are set to 0, the catalog cache should be disabled. This indicates a bug."); } @Test @@ -407,6 +407,49 @@ public void testInvalidateTableForChainedCachingCatalogs() throws Exception { assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent); } + @Test + public void testCachePolicyExpireAfterWrite() throws Exception { + Duration expireAfterWriteInterval = Duration.ofMinutes(3); + TestableCachingCatalog catalog = + TestableCachingCatalog.wrap( + hadoopCatalog(), EXPIRATION_TTL, ticker, expireAfterWriteInterval); + + Namespace namespace = Namespace.of("db", "ns1", "ns2"); + TableIdentifier tableIdent = TableIdentifier.of(namespace, "tbl"); + catalog.createTable(tableIdent, SCHEMA, SPEC, ImmutableMap.of("key", "value")); + + // Ensure table is cached with full ttl remaining upon creation + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL); + assertThat(catalog.cache().policy().expireAfterWrite().flatMap(t -> t.ageOf(tableIdent))) + .isPresent() + .get() + .isEqualTo(Duration.ZERO); + + ticker.advance(HALF_OF_EXPIRATION); + assertThat(catalog.cache().asMap()).containsKey(tableIdent); + assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION); + assertThat(catalog.cache().policy().expireAfterWrite().flatMap(t -> t.ageOf(tableIdent))) + .isPresent() + .get() + .isEqualTo(HALF_OF_EXPIRATION); + + // Access the cache to check policy behaviour + catalog.loadTable(tableIdent); + + // Object age does not change after access + assertThat(catalog.cache().policy().expireAfterWrite().get().ageOf(tableIdent)) + .isPresent() + .get() + .isEqualTo(HALF_OF_EXPIRATION); + + ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofMinutes(1))); + assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent); + assertThat(catalog.loadTable(tableIdent)) + .as("CachingCatalog should return a new instance after expiration") + .isNotSameAs(table); + } + public static TableIdentifier[] metadataTables(TableIdentifier tableIdent) { return Arrays.stream(MetadataTableType.values()) .map(type -> TableIdentifier.parse(tableIdent + "." + type.name().toLowerCase(Locale.ROOT))) diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 3f730a4f4c13..99873c663796 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -146,6 +146,7 @@ Iceberg catalogs support using catalog properties to configure catalog behaviors | clients | 2 | client pool size | | cache-enabled | true | Whether to cache catalog entries | | cache.expiration-interval-ms | 30000 | How long catalog entries are locally cached, in milliseconds; 0 disables caching, negative values disable expiration | +| cache.expire-after-write-ms | 0 | Duration in milliseconds to expire a table from the cache after being created.tables will not refresh on write. 0 disables this policy. its disabled by default | | metrics-reporter-impl | org.apache.iceberg.metrics.LoggingMetricsReporter | Custom `MetricsReporter` implementation to use in a catalog. See the [Metrics reporting](metrics-reporting.md) section for additional details | | encryption.kms-impl | null | a custom `KeyManagementClient` implementation to use in a catalog for interactions with KMS (key management service). See the [Encryption](encryption.md) document for additional details | @@ -235,9 +236,9 @@ Warn: Setting `iceberg.engine.hive.lock-enabled`=`false` will cause HiveCatalog This should only be set to `false` if all following conditions are met: - [HIVE-26882](https://issues.apache.org/jira/browse/HIVE-26882) -is available on the Hive Metastore server + is available on the Hive Metastore server - [HIVE-28121](https://issues.apache.org/jira/browse/HIVE-28121) -is available on the Hive Metastore server, if it is backed by MySQL or MariaDB + is available on the Hive Metastore server, if it is backed by MySQL or MariaDB - All other HiveCatalogs committing to tables that this HiveCatalog commits to are also on Iceberg 1.3 or later - All other HiveCatalogs committing to tables that this HiveCatalog commits to have also disabled Hive locks on commit. diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4bb235b811d0..84c2b2fe53aa 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -108,7 +108,8 @@ public FlinkCatalog( CatalogLoader catalogLoader, Map catalogProps, boolean cacheEnabled, - long cacheExpirationIntervalMs) { + long cacheExpirationIntervalMs, + long cacheExpireAfterWriteIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; this.catalogProps = catalogProps; @@ -118,7 +119,8 @@ public FlinkCatalog( Catalog originalCatalog = catalogLoader.loadCatalog(); icebergCatalog = cacheEnabled - ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs) + ? CachingCatalog.wrap( + originalCatalog, cacheExpirationIntervalMs, cacheExpireAfterWriteIntervalMs) : originalCatalog; asNamespaceCatalog = originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index dd065617bd88..cd78d68dd58f 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -163,6 +163,12 @@ protected Catalog createCatalog( "%s is not allowed to be 0.", CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS); + long cacheExpireAfterWriteIntervalMs = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + return new FlinkCatalog( name, defaultDatabase, @@ -170,7 +176,8 @@ protected Catalog createCatalog( catalogLoader, properties, cacheEnabled, - cacheExpirationIntervalMs); + cacheExpirationIntervalMs, + cacheExpireAfterWriteIntervalMs); } private static Configuration mergeHiveConf( diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4bb235b811d0..84c2b2fe53aa 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -108,7 +108,8 @@ public FlinkCatalog( CatalogLoader catalogLoader, Map catalogProps, boolean cacheEnabled, - long cacheExpirationIntervalMs) { + long cacheExpirationIntervalMs, + long cacheExpireAfterWriteIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; this.catalogProps = catalogProps; @@ -118,7 +119,8 @@ public FlinkCatalog( Catalog originalCatalog = catalogLoader.loadCatalog(); icebergCatalog = cacheEnabled - ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs) + ? CachingCatalog.wrap( + originalCatalog, cacheExpirationIntervalMs, cacheExpireAfterWriteIntervalMs) : originalCatalog; asNamespaceCatalog = originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 33cbc92ddeec..775e11312f4a 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -165,6 +165,12 @@ protected Catalog createCatalog( "%s is not allowed to be 0.", CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS); + long cacheExpireAfterWriteIntervalMs = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + return new FlinkCatalog( name, defaultDatabase, @@ -172,7 +178,8 @@ protected Catalog createCatalog( catalogLoader, properties, cacheEnabled, - cacheExpirationIntervalMs); + cacheExpirationIntervalMs, + cacheExpireAfterWriteIntervalMs); } private static Configuration mergeHiveConf( diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4bb235b811d0..84c2b2fe53aa 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -108,7 +108,8 @@ public FlinkCatalog( CatalogLoader catalogLoader, Map catalogProps, boolean cacheEnabled, - long cacheExpirationIntervalMs) { + long cacheExpirationIntervalMs, + long cacheExpireAfterWriteIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; this.catalogProps = catalogProps; @@ -118,7 +119,8 @@ public FlinkCatalog( Catalog originalCatalog = catalogLoader.loadCatalog(); icebergCatalog = cacheEnabled - ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs) + ? CachingCatalog.wrap( + originalCatalog, cacheExpirationIntervalMs, cacheExpireAfterWriteIntervalMs) : originalCatalog; asNamespaceCatalog = originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 33cbc92ddeec..775e11312f4a 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -165,6 +165,12 @@ protected Catalog createCatalog( "%s is not allowed to be 0.", CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS); + long cacheExpireAfterWriteIntervalMs = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + return new FlinkCatalog( name, defaultDatabase, @@ -172,7 +178,8 @@ protected Catalog createCatalog( catalogLoader, properties, cacheEnabled, - cacheExpirationIntervalMs); + cacheExpirationIntervalMs, + cacheExpireAfterWriteIntervalMs); } private static Configuration mergeHiveConf( diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index c07508568dbf..18a43bf428c2 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -763,9 +763,15 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + long cacheExpireAfterWriteIntervalMs = + PropertyUtil.propertyAsLong( + options, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + // An expiration interval of 0ms effectively disables caching. // Do not wrap with CachingCatalog. - if (cacheExpirationIntervalMs == 0) { + if (cacheExpirationIntervalMs == 0 && cacheExpireAfterWriteIntervalMs == 0) { this.cacheEnabled = false; } @@ -777,7 +783,11 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name)); this.icebergCatalog = cacheEnabled - ? CachingCatalog.wrap(catalog, cacheCaseSensitive, cacheExpirationIntervalMs) + ? CachingCatalog.wrap( + catalog, + cacheCaseSensitive, + cacheExpirationIntervalMs, + cacheExpireAfterWriteIntervalMs) : catalog; if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 913fe25e6dd1..f8bef9e4d775 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -765,9 +765,15 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + long cacheExpireAfterWriteIntervalMs = + PropertyUtil.propertyAsLong( + options, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + // An expiration interval of 0ms effectively disables caching. // Do not wrap with CachingCatalog. - if (cacheExpirationIntervalMs == 0) { + if (cacheExpirationIntervalMs == 0 && cacheExpireAfterWriteIntervalMs == 0) { this.cacheEnabled = false; } @@ -779,7 +785,11 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name)); this.icebergCatalog = cacheEnabled - ? CachingCatalog.wrap(catalog, cacheCaseSensitive, cacheExpirationIntervalMs) + ? CachingCatalog.wrap( + catalog, + cacheCaseSensitive, + cacheExpirationIntervalMs, + cacheExpireAfterWriteIntervalMs) : catalog; if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index da22607d05b0..0fe700cf715e 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -764,9 +764,15 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + long cacheExpireAfterWriteIntervalMs = + PropertyUtil.propertyAsLong( + options, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + // An expiration interval of 0ms effectively disables caching. // Do not wrap with CachingCatalog. - if (cacheExpirationIntervalMs == 0) { + if (cacheExpirationIntervalMs == 0 && cacheExpireAfterWriteIntervalMs == 0) { this.cacheEnabled = false; } @@ -779,7 +785,11 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { SparkUtil.hadoopConfCatalogOverrides(SparkSession.getActiveSession().get(), name)); this.icebergCatalog = cacheEnabled - ? CachingCatalog.wrap(catalog, cacheCaseSensitive, cacheExpirationIntervalMs) + ? CachingCatalog.wrap( + catalog, + cacheCaseSensitive, + cacheExpirationIntervalMs, + cacheExpireAfterWriteIntervalMs) : catalog; if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index da22607d05b0..0fe700cf715e 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -764,9 +764,15 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + long cacheExpireAfterWriteIntervalMs = + PropertyUtil.propertyAsLong( + options, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS, + CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT); + // An expiration interval of 0ms effectively disables caching. // Do not wrap with CachingCatalog. - if (cacheExpirationIntervalMs == 0) { + if (cacheExpirationIntervalMs == 0 && cacheExpireAfterWriteIntervalMs == 0) { this.cacheEnabled = false; } @@ -779,7 +785,11 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { SparkUtil.hadoopConfCatalogOverrides(SparkSession.getActiveSession().get(), name)); this.icebergCatalog = cacheEnabled - ? CachingCatalog.wrap(catalog, cacheCaseSensitive, cacheExpirationIntervalMs) + ? CachingCatalog.wrap( + catalog, + cacheCaseSensitive, + cacheExpirationIntervalMs, + cacheExpireAfterWriteIntervalMs) : catalog; if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog;