Skip to content

Commit

Permalink
Add Presto Procedure to invalidate Metastore Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
imjalpreet committed Jan 24, 2025
1 parent ff8b97d commit 5860e1e
Show file tree
Hide file tree
Showing 10 changed files with 708 additions and 19 deletions.
46 changes: 34 additions & 12 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,26 @@ Metastore Configuration Properties

The required Hive metastore can be configured with a number of properties.

======================================= ============================================================ ============
Property Name Description Default
======================================= ============================================================ ============
``hive.metastore-timeout`` Timeout for Hive metastore requests. ``10s``
======================================================= ============================================================= ============
Property Name Description Default
======================================================= ============================================================= ============
``hive.metastore-timeout`` Timeout for Hive metastore requests. ``10s``

``hive.metastore-cache-ttl`` Duration how long cached metastore data should be considered ``0s``
valid.
``hive.metastore-cache-ttl`` Duration how long cached metastore data should be considered ``0s``
valid.

``hive.metastore-cache-maximum-size`` Hive metastore cache maximum size. 10000
``hive.metastore-cache-maximum-size`` Hive metastore cache maximum size. 10000

``hive.metastore-refresh-interval`` Asynchronously refresh cached metastore data after access ``0s``
if it is older than this but is not yet expired, allowing
subsequent accesses to see fresh data.
``hive.metastore-refresh-interval`` Asynchronously refresh cached metastore data after access ``0s``
if it is older than this but is not yet expired, allowing
subsequent accesses to see fresh data.

``hive.metastore-refresh-max-threads`` Maximum threads used to refresh cached metastore data. 100
``hive.metastore-refresh-max-threads`` Maximum threads used to refresh cached metastore data. 100

======================================= ============================================================ ============
``hive.invalidate-metastore-cache-procedure-enabled`` When enabled, users will be able to invalidate metastore false
cache on demand.

======================================================= ============================================================= ============

AWS Glue Catalog Configuration Properties
-----------------------------------------
Expand Down Expand Up @@ -952,6 +955,25 @@ The following procedures are available:

Invalidate directory list cache for specified directory_path.

* ``system.invalidate_metastore_cache()``

Invalidate all metastore caches.

* ``system.invalidate_metastore_cache(schema_name)``

Invalidate all metastore cache entries linked to a specific schema.

* ``system.invalidate_metastore_cache(schema_name, table_name)``

Invalidate all metastore cache entries linked to a specific table.

* ``system.invalidate_metastore_cache(schema_name, table_name, partition_columns, partition_values)``

Invalidate all metastore cache entries linked to a specific partition.

Note: To enable ``system.invalidate_metastore_cache`` procedure, please refer to the properties that
apply to Hive Metastore and are listed in the `Metastore Configuration Properties`_ table.

Extra Hidden Columns
--------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class MetastoreClientConfig
private int partitionCacheColumnCountLimit = 500;
private HiveMetastoreAuthenticationType hiveMetastoreAuthenticationType = HiveMetastoreAuthenticationType.NONE;
private boolean deleteFilesOnTableDrop;
private boolean invalidateMetastoreCacheProcedureEnabled;

public HostAndPort getMetastoreSocksProxy()
{
Expand Down Expand Up @@ -303,4 +304,17 @@ public MetastoreClientConfig setDeleteFilesOnTableDrop(boolean deleteFilesOnTabl
this.deleteFilesOnTableDrop = deleteFilesOnTableDrop;
return this;
}

public boolean isInvalidateMetastoreCacheProcedureEnabled()
{
return invalidateMetastoreCacheProcedureEnabled;
}

@Config("hive.invalidate-metastore-cache-procedure-enabled")
@ConfigDescription("When enabled, users will be able to invalidate metastore cache on demand")
public MetastoreClientConfig setInvalidateMetastoreCacheProcedureEnabled(boolean invalidateMetastoreCacheProcedureEnabled)
{
this.invalidateMetastoreCacheProcedureEnabled = invalidateMetastoreCacheProcedureEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Predicate;

import static com.facebook.presto.hive.HiveErrorCode.HIVE_CORRUPTED_PARTITION_CACHE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
Expand All @@ -72,6 +73,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;

@ThreadSafe
public class InMemoryCachingHiveMetastore
Expand Down Expand Up @@ -798,6 +800,187 @@ public Set<HivePrivilegeInfo> loadTablePrivileges(KeyAndContext<UserTableKey> lo
return delegate.listTablePrivileges(loadTablePrivilegesKey.getContext(), loadTablePrivilegesKey.getKey().getDatabase(), loadTablePrivilegesKey.getKey().getTable(), loadTablePrivilegesKey.getKey().getPrincipal());
}

public void invalidateCache(MetastoreContext metastoreContext, String databaseName)
{
checkArgument(databaseName != null && !databaseName.isEmpty(), "databaseName cannot be null or empty");

MetastoreContext newMetastoreContext = applyImpersonationToMetastoreContext(metastoreContext);

// Invalidate Database Cache
invalidateCacheForKey(
databaseCache,
newMetastoreContext,
databaseKey -> databaseKey.getKey().equals(databaseName));

// Invalidate Database Names Cache
invalidateCacheForKey(databaseNamesCache, newMetastoreContext, databaseNamesKey -> true);

// Invalidate table specific caches for all the tables in this database
invalidateCacheForKey(
tableCache,
newMetastoreContext,
hiveTableHandleKeyAndContext -> hiveTableHandleKeyAndContext.getKey().getSchemaName().equals(databaseName));

invalidateCacheForKey(
tableNamesCache,
newMetastoreContext,
databaseNameKey -> databaseNameKey.getKey().equals(databaseName));

invalidateCacheForKey(
tableConstraintsCache,
newMetastoreContext,
hiveTableNameKeyAndContext -> hiveTableNameKeyAndContext.getKey().getDatabaseName().equals(databaseName));

invalidateCacheForKey(
tablePrivilegesCache,
newMetastoreContext,
userTableKeyKeyAndContext -> userTableKeyKeyAndContext.getKey().getDatabase().equals(databaseName));

invalidateCacheForKey(
tableStatisticsCache,
newMetastoreContext,
hiveTableNameKeyAndContext -> hiveTableNameKeyAndContext.getKey().getDatabaseName().equals(databaseName));

invalidateCacheForKey(
viewNamesCache,
newMetastoreContext,
databaseNameKey -> databaseNameKey.getKey().equals(databaseName));

// Invalidate partition cache for partitions in all the tables in the given database
invalidateCacheForKey(
partitionNamesCache,
newMetastoreContext,
hiveTableNameKeyAndContext -> hiveTableNameKeyAndContext.getKey().getDatabaseName().equals(databaseName));

invalidateCacheForKey(
partitionCache,
newMetastoreContext,
hivePartitionNameKeyAndContext -> hivePartitionNameKeyAndContext.getKey().getHiveTableName().getDatabaseName().equals(databaseName));

invalidateCacheForKey(
partitionFilterCache,
newMetastoreContext,
partitionFilterKeyAndContext -> partitionFilterKeyAndContext.getKey().getHiveTableName().getDatabaseName().equals(databaseName));

invalidateCacheForKey(
partitionStatisticsCache,
newMetastoreContext,
hivePartitionNameKeyAndContext -> hivePartitionNameKeyAndContext.getKey().getHiveTableName().getDatabaseName().equals(databaseName));
}

public void invalidateCache(MetastoreContext metastoreContext, String databaseName, String tableName)
{
checkArgument(databaseName != null && !databaseName.isEmpty(), "databaseName cannot be null or empty");
checkArgument(tableName != null && !tableName.isEmpty(), "tableName cannot be null or empty");

MetastoreContext newMetastoreContext = applyImpersonationToMetastoreContext(metastoreContext);
HiveTableName hiveTableName = hiveTableName(databaseName, tableName);

// Invalidate Table Cache
invalidateCacheForKey(
tableCache,
newMetastoreContext,
hiveTableHandleKeyAndContext -> isSameTable(hiveTableHandleKeyAndContext.getKey(), hiveTableName));

// Invalidate Table Names Cache
invalidateCacheForKey(
tableNamesCache,
newMetastoreContext,
databaseNameKey -> databaseNameKey.getKey().equals(databaseName));

// Invalidate Table Constraints Cache
invalidateCacheForKey(
tableConstraintsCache,
newMetastoreContext,
hiveTableNameKeyAndContext -> hiveTableNameKeyAndContext.getKey().equals(hiveTableName));

// Invalidate Table Privileges Cache
invalidateCacheForKey(
tablePrivilegesCache,
newMetastoreContext,
userTableKeyKeyAndContext -> userTableKeyKeyAndContext.getKey().matches(databaseName, tableName));

// Invalidate Table Statistics Cache
invalidateCacheForKey(
tableStatisticsCache,
newMetastoreContext,
hiveTableNameKeyAndContext -> hiveTableNameKeyAndContext.getKey().equals(hiveTableName));

// Invalidate View Names Cache
invalidateCacheForKey(
viewNamesCache,
newMetastoreContext,
databaseNameKey -> databaseNameKey.getKey().equals(databaseName));

// Invalidate partition cache for all partitions in the given table
invalidateCacheForKey(
partitionNamesCache,
newMetastoreContext,
hiveTableNameKeyAndContext -> hiveTableNameKeyAndContext.getKey().equals(hiveTableName));

invalidateCacheForKey(
partitionCache,
newMetastoreContext,
hivePartitionNameKeyAndContext -> hivePartitionNameKeyAndContext.getKey().getHiveTableName().equals(hiveTableName));

invalidateCacheForKey(
partitionFilterCache,
newMetastoreContext,
partitionFilterKeyAndContext -> partitionFilterKeyAndContext.getKey().getHiveTableName().equals(hiveTableName));

invalidateCacheForKey(
partitionStatisticsCache,
newMetastoreContext,
hivePartitionNameKeyAndContext -> hivePartitionNameKeyAndContext.getKey().getHiveTableName().equals(hiveTableName));
}

public void invalidateCache(
MetastoreContext metastoreContext,
String databaseName,
String tableName,
List<String> partitionColumnNames,
List<String> partitionValues)
{
checkArgument(databaseName != null && !databaseName.isEmpty(), "databaseName cannot be null or empty");
checkArgument(tableName != null && !tableName.isEmpty(), "tableName cannot be null or empty");
checkArgument(partitionColumnNames != null && !partitionColumnNames.isEmpty(), "partitionColumnNames cannot be null or empty");
checkArgument(partitionValues != null && !partitionValues.isEmpty(), "partitionValues cannot be null or empty");
checkArgument(partitionColumnNames.size() == partitionValues.size(), "partitionColumnNames and partitionValues should be of same length");

MetastoreContext newMetastoreContext = applyImpersonationToMetastoreContext(metastoreContext);
String partitionName = makePartName(partitionColumnNames, partitionValues);
HiveTableName hiveTableName = hiveTableName(databaseName, tableName);

Predicate<KeyAndContext<HivePartitionName>> hivePartitionNamePredicate = hivePartitionNameKeyAndContext ->
hivePartitionNameKeyAndContext.getKey().getHiveTableName().equals(hiveTableName) &&
hivePartitionNameKeyAndContext.getKey().getPartitionNameWithVersion().map(value -> value.getPartitionName().equals(partitionName)).orElse(false);

// Invalidate Partition Names Cache
invalidateCacheForKey(
partitionNamesCache,
newMetastoreContext,
hiveTableNameKeyAndContext -> hiveTableNameKeyAndContext.getKey().equals(hiveTableName));

// Invalidate Partition Cache
invalidateCacheForKey(partitionCache, newMetastoreContext, hivePartitionNamePredicate);

// Invalidate Partition Filter Cache
invalidateCacheForKey(
partitionFilterCache,
newMetastoreContext,
partitionFilterKeyAndContext -> partitionFilterKeyAndContext.getKey().getHiveTableName().equals(hiveTableName));

// Invalidate Partition Statistics Cache
invalidateCacheForKey(partitionStatisticsCache, newMetastoreContext, hivePartitionNamePredicate);
}

private <K> void invalidateCacheForKey(LoadingCache<KeyAndContext<K>, ?> cache, MetastoreContext newMetastoreContext, Predicate<KeyAndContext<K>> keyPredicate)
{
cache.asMap().keySet().stream()
.filter(key -> (!newMetastoreContext.isImpersonationEnabled() || key.getContext().equals(newMetastoreContext)) && keyPredicate.test(key))
.forEach(cache::invalidate);
}

private static class KeyAndContext<T>
{
private final MetastoreContext context;
Expand Down Expand Up @@ -858,7 +1041,7 @@ public String toString()
}
}

private <T> KeyAndContext<T> getCachingKey(MetastoreContext context, T key)
private MetastoreContext applyImpersonationToMetastoreContext(MetastoreContext context)
{
if (metastoreImpersonationEnabled) {
context = new MetastoreContext(
Expand All @@ -874,7 +1057,12 @@ private <T> KeyAndContext<T> getCachingKey(MetastoreContext context, T key)
context.getWarningCollector(),
context.getRuntimeStats());
}
return new KeyAndContext<>(context, key);
return context;
}

private <T> KeyAndContext<T> getCachingKey(MetastoreContext context, T key)
{
return new KeyAndContext<>(applyImpersonationToMetastoreContext(context), key);
}

private static CacheBuilder<Object, Object> newCacheBuilder(OptionalLong expiresAfterWriteMillis, OptionalLong refreshMillis, long maximumSize)
Expand Down
Loading

0 comments on commit 5860e1e

Please sign in to comment.