-
Notifications
You must be signed in to change notification settings - Fork 406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#6566] improvement(core): Add the cache mechanism for metalake and use cache to load in-use
information.
#6569
base: main
Are you sure you want to change the base?
Changes from all commits
b6e7c82
0e1c41f
302d241
bade110
1d181a8
9e11e3e
aa8aa80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -262,7 +262,7 @@ private ModelCatalog asModels() { | |
|
||
private final Config config; | ||
|
||
@VisibleForTesting final Cache<NameIdentifier, CatalogWrapper> catalogCache; | ||
@VisibleForTesting static Cache<NameIdentifier, CatalogWrapper> catalogCache; | ||
|
||
private final EntityStore store; | ||
|
||
|
@@ -281,7 +281,7 @@ public CatalogManager(Config config, EntityStore store, IdGenerator idGenerator) | |
this.idGenerator = idGenerator; | ||
|
||
long cacheEvictionIntervalInMs = config.get(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS); | ||
this.catalogCache = | ||
catalogCache = | ||
Caffeine.newBuilder() | ||
.expireAfterAccess(cacheEvictionIntervalInMs, TimeUnit.MILLISECONDS) | ||
.removalListener( | ||
|
@@ -848,12 +848,17 @@ private static boolean catalogInUse(EntityStore store, NameIdentifier ident) | |
|
||
private static boolean getCatalogInUseValue(EntityStore store, NameIdentifier catalogIdent) { | ||
try { | ||
CatalogEntity catalogEntity = | ||
store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class); | ||
CatalogWrapper wrapper = catalogCache.getIfPresent(catalogIdent); | ||
CatalogEntity catalogEntity; | ||
if (wrapper != null) { | ||
catalogEntity = wrapper.catalog.entity(); | ||
} else { | ||
catalogEntity = store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we put this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Loading |
||
} | ||
|
||
return (boolean) | ||
BASIC_CATALOG_PROPERTIES_METADATA.getOrDefault( | ||
catalogEntity.getProperties(), PROPERTY_IN_USE); | ||
|
||
} catch (NoSuchEntityException e) { | ||
LOG.warn("Catalog {} does not exist", catalogIdent, e); | ||
throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, catalogIdent); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,20 @@ | |
|
||
import static org.apache.gravitino.Metalake.PROPERTY_IN_USE; | ||
|
||
import com.github.benmanes.caffeine.cache.Cache; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.github.benmanes.caffeine.cache.Scheduler; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.Maps; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.gravitino.Entity.EntityType; | ||
import org.apache.gravitino.EntityAlreadyExistsException; | ||
import org.apache.gravitino.EntityStore; | ||
|
@@ -64,6 +71,21 @@ public class MetalakeManager implements MetalakeDispatcher { | |
|
||
private final IdGenerator idGenerator; | ||
|
||
@VisibleForTesting | ||
static final Cache<NameIdentifier, BaseMetalake> METALAKE_CACHE = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If our goal is to accelerate the acquisition of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cacheing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think caching metalake is better, because the amount of metalake is quite limited, with small memory size we can improve the performance a lot, it is worthy to cache the metalake. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you use uppercase for this variable? Typically, we only use uppercase letter for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It indeed has |
||
Caffeine.newBuilder() | ||
.expireAfterAccess(24, TimeUnit.HOURS) | ||
.removalListener((k, v, c) -> LOG.info("Closing metalake {}.", k)) | ||
.scheduler( | ||
Scheduler.forScheduledExecutorService( | ||
new ScheduledThreadPoolExecutor( | ||
1, | ||
new ThreadFactoryBuilder() | ||
.setDaemon(true) | ||
.setNameFormat("metalake-cleaner-%d") | ||
.build()))) | ||
.build(); | ||
|
||
/** | ||
* Constructs a MetalakeManager instance. | ||
* | ||
|
@@ -73,6 +95,13 @@ public class MetalakeManager implements MetalakeDispatcher { | |
public MetalakeManager(EntityStore store, IdGenerator idGenerator) { | ||
this.store = store; | ||
this.idGenerator = idGenerator; | ||
|
||
// pre-load all metalakes and put them into cache, this is useful when user load schema/table | ||
// directly without list/get metalake first. | ||
BaseMetalake[] metalakes = listMetalakes(); | ||
for (BaseMetalake metalake : metalakes) { | ||
METALAKE_CACHE.put(metalake.nameIdentifier(), metalake); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -103,10 +132,12 @@ public static void checkMetalake(NameIdentifier ident, EntityStore store) | |
public static boolean metalakeInUse(EntityStore store, NameIdentifier ident) | ||
throws NoSuchMetalakeException { | ||
try { | ||
BaseMetalake metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); | ||
BaseMetalake metalake = METALAKE_CACHE.getIfPresent(ident); | ||
if (metalake == null) { | ||
metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); | ||
} | ||
Comment on lines
+135
to
+138
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not cache the result after getting it from the store? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see #6569 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after the user alters the metalake, then load schema/table directly without list/get the metalake, the cache never be hitting, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah. alter a metalake is not operated frequently, so I guess it's acceptable |
||
return (boolean) | ||
metalake.propertiesMetadata().getOrDefault(metalake.properties(), PROPERTY_IN_USE); | ||
|
||
} catch (NoSuchEntityException e) { | ||
LOG.warn("Metalake {} does not exist", ident, e); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
|
@@ -149,20 +180,24 @@ public BaseMetalake[] listMetalakes() { | |
*/ | ||
@Override | ||
public BaseMetalake loadMetalake(NameIdentifier ident) throws NoSuchMetalakeException { | ||
try { | ||
BaseMetalake baseMetalake = | ||
TreeLockUtils.doWithTreeLock( | ||
ident, | ||
LockType.READ, | ||
() -> store.get(ident, EntityType.METALAKE, BaseMetalake.class)); | ||
return newMetalakeWithResolvedProperties(baseMetalake); | ||
} catch (NoSuchEntityException e) { | ||
LOG.warn("Metalake {} does not exist", ident, e); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
} catch (IOException ioe) { | ||
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); | ||
throw new RuntimeException(ioe); | ||
} | ||
return METALAKE_CACHE.get( | ||
ident, | ||
k -> { | ||
try { | ||
BaseMetalake baseMetalake = | ||
TreeLockUtils.doWithTreeLock( | ||
ident, | ||
LockType.READ, | ||
() -> store.get(ident, EntityType.METALAKE, BaseMetalake.class)); | ||
return newMetalakeWithResolvedProperties(baseMetalake); | ||
} catch (NoSuchEntityException e) { | ||
LOG.warn("Metalake {} does not exist", ident, e); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
} catch (IOException ioe) { | ||
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); | ||
throw new RuntimeException(ioe); | ||
} | ||
}); | ||
} | ||
|
||
private BaseMetalake newMetalakeWithResolvedProperties(BaseMetalake metalakeEntity) { | ||
|
@@ -222,6 +257,7 @@ public BaseMetalake createMetalake( | |
() -> { | ||
try { | ||
store.put(metalake, false /* overwritten */); | ||
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(metalake)); | ||
return metalake; | ||
} catch (EntityAlreadyExistsException | AlreadyExistsException e) { | ||
LOG.warn("Metalake {} already exists", ident, e); | ||
|
@@ -253,22 +289,24 @@ public BaseMetalake alterMetalake(NameIdentifier ident, MetalakeChange... change | |
throw new MetalakeNotInUseException( | ||
"Metalake %s is not in use, please enable it first", ident); | ||
} | ||
|
||
return store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
builder = updateEntity(builder, newProps, changes); | ||
|
||
return builder.build(); | ||
}); | ||
|
||
METALAKE_CACHE.invalidate(ident); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the result be cached after updating? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put it back to cache seems to be an optional if this optional is not frequently called. Anyway, this is an improvement, let me check if we can add it back. |
||
BaseMetalake baseMetalake = | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
builder = updateEntity(builder, newProps, changes); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake)); | ||
return baseMetalake; | ||
} catch (NoSuchEntityException ne) { | ||
LOG.warn("Metalake {} does not exist", ident, ne); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
|
@@ -305,6 +343,8 @@ public boolean dropMetalake(NameIdentifier ident, boolean force) | |
"Metalake %s is in use, please disable it first or use force option", ident); | ||
} | ||
|
||
METALAKE_CACHE.invalidate(ident); | ||
|
||
List<CatalogEntity> catalogEntities = | ||
store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG); | ||
if (!catalogEntities.isEmpty() && !force) { | ||
|
@@ -331,22 +371,25 @@ public void enableMetalake(NameIdentifier ident) throws NoSuchMetalakeException | |
try { | ||
boolean inUse = metalakeInUse(store, ident); | ||
if (!inUse) { | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
|
||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
newProps.put(PROPERTY_IN_USE, "true"); | ||
builder.withProperties(newProps); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.invalidate(ident); | ||
BaseMetalake baseMetalake = | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
|
||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
newProps.put(PROPERTY_IN_USE, "true"); | ||
builder.withProperties(newProps); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake)); | ||
} | ||
|
||
return null; | ||
|
@@ -365,6 +408,7 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException | |
try { | ||
boolean inUse = metalakeInUse(store, ident); | ||
if (inUse) { | ||
METALAKE_CACHE.invalidate(ident); | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we make it static? I assume there will be only one
CatalogManager
, so there should be only onecatalogCache
, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method check
catalogInUse
andmetalakeInUse
are all static. If we want to use cache for them, we need to change it tostatic
Yes, there will be only one cache and all catalogs shares the same instance, It's not a big problem I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should named
CATALOG_CACHE
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be
final
?