Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 85 additions & 14 deletions core/src/main/java/org/apache/iceberg/CachingCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
Expand All @@ -52,12 +53,34 @@ public static Catalog wrap(Catalog catalog) {
}

public static Catalog wrap(Catalog catalog, long expirationIntervalMillis) {
return wrap(catalog, true, expirationIntervalMillis);
return wrap(
catalog,
true,
expirationIntervalMillis,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT);
}

public static Catalog wrap(
Catalog catalog, long expirationIntervalMillis, long expireAfterWriteIntervalMillis) {
return wrap(catalog, true, expirationIntervalMillis, expireAfterWriteIntervalMillis);
}

public static Catalog wrap(
Catalog catalog, boolean caseSensitive, long expirationIntervalMillis) {
return new CachingCatalog(catalog, caseSensitive, expirationIntervalMillis);
return wrap(
catalog,
caseSensitive,
expirationIntervalMillis,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT);
}

public static Catalog wrap(
Catalog catalog,
boolean caseSensitive,
long expirationIntervalMillis,
long expireAfterWriteIntervalMillis) {
return new CachingCatalog(
catalog, caseSensitive, expirationIntervalMillis, expireAfterWriteIntervalMillis);
}

private final Catalog catalog;
Expand All @@ -66,23 +89,61 @@ public static Catalog wrap(
@SuppressWarnings("checkstyle:VisibilityModifier")
protected final long expirationIntervalMillis;

@SuppressWarnings("checkstyle:VisibilityModifier")
protected long expireAfterWriteIntervalMillis;

@SuppressWarnings("checkstyle:VisibilityModifier")
protected final Cache<TableIdentifier, Table> tableCache;

private CachingCatalog(Catalog catalog, boolean caseSensitive, long expirationIntervalMillis) {
this(catalog, caseSensitive, expirationIntervalMillis, Ticker.systemTicker());
private CachingCatalog(
Catalog catalog,
boolean caseSensitive,
long expirationIntervalMillis,
long expireAfterWriteIntervalMillis) {
this(
catalog,
caseSensitive,
expirationIntervalMillis,
Ticker.systemTicker(),
expireAfterWriteIntervalMillis);
}

/**
* Caching Catalog
*
* @deprecated will be removed in 1.12.0; use {@link #CachingCatalog(Catalog, boolean, long,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think instead of #CachingCatalog( simply CachingCatalog( also works.

* Ticker, long)} instead.
*/
@Deprecated
@VisibleForTesting
@SuppressWarnings("checkstyle:VisibilityModifier")
protected CachingCatalog(
Catalog catalog, boolean caseSensitive, long expirationIntervalMillis, Ticker ticker) {
this(
catalog,
caseSensitive,
expirationIntervalMillis,
ticker,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT);
}

@VisibleForTesting
CachingCatalog(
Catalog catalog,
boolean caseSensitive,
long expirationIntervalMillis,
Ticker ticker,
long expireAfterWriteIntervalMillis) {
Preconditions.checkArgument(
expirationIntervalMillis != 0,
"When %s is set to 0, the catalog cache should be disabled. This indicates a bug.",
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
expirationIntervalMillis != 0 || expireAfterWriteIntervalMillis != 0,
"When both %s and %s are set to 0, the catalog cache should be disabled. This indicates a bug.",
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS);

this.catalog = catalog;
this.caseSensitive = caseSensitive;
this.expirationIntervalMillis = expirationIntervalMillis;
this.expireAfterWriteIntervalMillis = expireAfterWriteIntervalMillis;
this.tableCache = createTableCache(ticker);
}

Expand All @@ -104,15 +165,25 @@ public void onRemoval(TableIdentifier tableIdentifier, Table table, RemovalCause
}

private Cache<TableIdentifier, Table> createTableCache(Ticker ticker) {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder().softValues();

if (expirationIntervalMillis <= 0 && expireAfterWriteIntervalMillis <= 0) {
return Caffeine.newBuilder().softValues().build();
}

Caffeine<TableIdentifier, Table> cacheBuilder =
Caffeine.newBuilder()
.softValues()
.removalListener(new MetadataTableInvalidatingRemovalListener())
.executor(Runnable::run) // Makes the callbacks to removal listener synchronous
.ticker(ticker);

if (expirationIntervalMillis > 0) {
return cacheBuilder
.removalListener(new MetadataTableInvalidatingRemovalListener())
.executor(Runnable::run) // Makes the callbacks to removal listener synchronous
.expireAfterAccess(Duration.ofMillis(expirationIntervalMillis))
.ticker(ticker)
.build();
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofMillis(expirationIntervalMillis));
}

if (expireAfterWriteIntervalMillis > 0) {
cacheBuilder =
cacheBuilder.expireAfterWrite(Duration.ofMillis(expireAfterWriteIntervalMillis));
}

return cacheBuilder.build();
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,19 @@ private CatalogProperties() {}
* <ul>
* <li>Zero - Caching and cache expiration are both disabled
* <li>Negative Values - Cache expiration is turned off and entries expire only on refresh etc
* <li>Positive Values - Cache entries expire if not accessed via the cache after this many
* milliseconds
* <li>Positive Values - Cache entries expire this milliseconds based on the cache expiration
* policy
* </ul>
*/
public static final String CACHE_EXPIRATION_INTERVAL_MS = "cache.expiration-interval-ms";

public static final long CACHE_EXPIRATION_INTERVAL_MS_DEFAULT = TimeUnit.SECONDS.toMillis(30);
public static final long CACHE_EXPIRATION_INTERVAL_MS_OFF = -1;

public static final String CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS =
"cache.expiration.expire-after-write-interval-ms";
public static final long CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT = 0;

/**
* Controls whether to use caching during manifest reads or not.
*
Expand Down
33 changes: 28 additions & 5 deletions core/src/test/java/org/apache/iceberg/TestableCachingCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,40 @@ public class TestableCachingCatalog extends CachingCatalog {
public static TestableCachingCatalog wrap(
Catalog catalog, Duration expirationInterval, Ticker ticker) {
return new TestableCachingCatalog(
catalog, true /* caseSensitive */, expirationInterval, ticker);
catalog,
true /* caseSensitive */,
expirationInterval,
ticker,
Duration.ofMillis(
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT));
}

public static TestableCachingCatalog wrap(
Catalog catalog,
Duration expirationInterval,
Ticker ticker,
Duration expireAfterWriteInterval) {
return new TestableCachingCatalog(
catalog, true /* caseSensitive */, expirationInterval, ticker, expireAfterWriteInterval);
}

private final Duration cacheExpirationInterval;
private final Duration expireAfterWriteInterval;

TestableCachingCatalog(
Catalog catalog, boolean caseSensitive, Duration expirationInterval, Ticker ticker) {
super(catalog, caseSensitive, expirationInterval.toMillis(), ticker);
Catalog catalog,
boolean caseSensitive,
Duration expirationInterval,
Ticker ticker,
Duration expireAfterWriteInterval) {
super(
catalog,
caseSensitive,
expirationInterval.toMillis(),
ticker,
expireAfterWriteInterval.toMillis());
this.cacheExpirationInterval = expirationInterval;
this.expireAfterWriteInterval = expireAfterWriteInterval;
}

public Cache<TableIdentifier, Table> cache() {
Expand All @@ -58,12 +83,10 @@ public boolean isCacheExpirationEnabled() {
|| tableCache.policy().expireAfterWrite().isPresent();
}

// Throws a NoSuchElementException if this entry is not in the cache (has already been TTL'd).
public Optional<Duration> ageOf(TableIdentifier identifier) {
return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
}

// Throws a NoSuchElementException if the entry is not in the cache (has already been TTL'd).
public Optional<Duration> remainingAgeFor(TableIdentifier identifier) {
return ageOf(identifier).map(cacheExpirationInterval::minus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void testCachingCatalogRejectsExpirationIntervalOfZero() {
assertThatThrownBy(() -> TestableCachingCatalog.wrap(hadoopCatalog(), Duration.ZERO, ticker))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"When cache.expiration-interval-ms is set to 0, the catalog cache should be disabled. This indicates a bug.");
"When both cache.expiration-interval-ms and cache.expiration.expire-after-write-interval-ms are set to 0, the catalog cache should be disabled. This indicates a bug.");
}

@Test
Expand Down Expand Up @@ -407,6 +407,49 @@ public void testInvalidateTableForChainedCachingCatalogs() throws Exception {
assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
}

@Test
public void testCachePolicyExpireAfterWrite() throws Exception {
Duration expireAfterWriteInterval = Duration.ofMinutes(3);
TestableCachingCatalog catalog =
TestableCachingCatalog.wrap(
hadoopCatalog(), EXPIRATION_TTL, ticker, expireAfterWriteInterval);

Namespace namespace = Namespace.of("db", "ns1", "ns2");
TableIdentifier tableIdent = TableIdentifier.of(namespace, "tbl");
catalog.createTable(tableIdent, SCHEMA, SPEC, ImmutableMap.of("key", "value"));

// Ensure table is cached with full ttl remaining upon creation
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
assertThat(catalog.cache().policy().expireAfterWrite().flatMap(t -> t.ageOf(tableIdent)))
.isPresent()
.get()
.isEqualTo(Duration.ZERO);

ticker.advance(HALF_OF_EXPIRATION);
assertThat(catalog.cache().asMap()).containsKey(tableIdent);
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
assertThat(catalog.cache().policy().expireAfterWrite().flatMap(t -> t.ageOf(tableIdent)))
.isPresent()
.get()
.isEqualTo(HALF_OF_EXPIRATION);

// Access the cache to check policy behaviour
catalog.loadTable(tableIdent);

// Object age does not change after access
assertThat(catalog.cache().policy().expireAfterWrite().get().ageOf(tableIdent))
.isPresent()
.get()
.isEqualTo(HALF_OF_EXPIRATION);

ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofMinutes(1)));
assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
assertThat(catalog.loadTable(tableIdent))
.as("CachingCatalog should return a new instance after expiration")
.isNotSameAs(table);
}

public static TableIdentifier[] metadataTables(TableIdentifier tableIdent) {
return Arrays.stream(MetadataTableType.values())
.map(type -> TableIdentifier.parse(tableIdent + "." + type.name().toLowerCase(Locale.ROOT)))
Expand Down
5 changes: 3 additions & 2 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Iceberg catalogs support using catalog properties to configure catalog behaviors
| clients | 2 | client pool size |
| cache-enabled | true | Whether to cache catalog entries |
| cache.expiration-interval-ms | 30000 | How long catalog entries are locally cached, in milliseconds; 0 disables caching, negative values disable expiration |
| cache.expire-after-write-ms | 0 | Duration in milliseconds to expire a table from the cache after being created.tables will not refresh on write. 0 disables this policy. its disabled by default |
| metrics-reporter-impl | org.apache.iceberg.metrics.LoggingMetricsReporter | Custom `MetricsReporter` implementation to use in a catalog. See the [Metrics reporting](metrics-reporting.md) section for additional details |
| encryption.kms-impl | null | a custom `KeyManagementClient` implementation to use in a catalog for interactions with KMS (key management service). See the [Encryption](encryption.md) document for additional details |

Expand Down Expand Up @@ -235,9 +236,9 @@ Warn: Setting `iceberg.engine.hive.lock-enabled`=`false` will cause HiveCatalog
This should only be set to `false` if all following conditions are met:

- [HIVE-26882](https://issues.apache.org/jira/browse/HIVE-26882)
is available on the Hive Metastore server
is available on the Hive Metastore server
- [HIVE-28121](https://issues.apache.org/jira/browse/HIVE-28121)
is available on the Hive Metastore server, if it is backed by MySQL or MariaDB
is available on the Hive Metastore server, if it is backed by MySQL or MariaDB
- All other HiveCatalogs committing to tables that this HiveCatalog commits to are also on Iceberg 1.3 or later
- All other HiveCatalogs committing to tables that this HiveCatalog commits to have also disabled Hive locks on commit.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public FlinkCatalog(
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
long cacheExpirationIntervalMs,
long cacheExpireAfterWriteIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
Expand All @@ -118,7 +119,8 @@ public FlinkCatalog(
Catalog originalCatalog = catalogLoader.loadCatalog();
icebergCatalog =
cacheEnabled
? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
? CachingCatalog.wrap(
originalCatalog, cacheExpirationIntervalMs, cacheExpireAfterWriteIntervalMs)
: originalCatalog;
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,21 @@ protected Catalog createCatalog(
"%s is not allowed to be 0.",
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);

long cacheExpireAfterWriteIntervalMs =
PropertyUtil.propertyAsLong(
properties,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT);

return new FlinkCatalog(
name,
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
cacheExpirationIntervalMs,
cacheExpireAfterWriteIntervalMs);
}

private static Configuration mergeHiveConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public FlinkCatalog(
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
long cacheExpirationIntervalMs,
long cacheExpireAfterWriteIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
Expand All @@ -118,7 +119,8 @@ public FlinkCatalog(
Catalog originalCatalog = catalogLoader.loadCatalog();
icebergCatalog =
cacheEnabled
? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
? CachingCatalog.wrap(
originalCatalog, cacheExpirationIntervalMs, cacheExpireAfterWriteIntervalMs)
: originalCatalog;
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,21 @@ protected Catalog createCatalog(
"%s is not allowed to be 0.",
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);

long cacheExpireAfterWriteIntervalMs =
PropertyUtil.propertyAsLong(
properties,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS,
CatalogProperties.CACHE_EXPIRATION_EXPIRE_AFTER_WRITE_INTERVAL_MS_DEFAULT);

return new FlinkCatalog(
name,
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
cacheExpirationIntervalMs,
cacheExpireAfterWriteIntervalMs);
}

private static Configuration mergeHiveConf(
Expand Down
Loading