Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6566] improvement(core): Add the cache mechanism for metalake and use cache to load in-use information. #6569

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ private void initGravitinoServerComponents() {
// create and initialize a random id generator
this.idGenerator = new RandomIdGenerator();

// Tree lock
this.lockManager = new LockManager(config);

// Create and initialize metalake related modules, the operation chain is:
// MetalakeEventDispatcher -> MetalakeNormalizeDispatcher -> MetalakeHookDispatcher ->
// MetalakeManager
Expand Down Expand Up @@ -498,9 +501,6 @@ private void initGravitinoServerComponents() {
this.auxServiceManager = new AuxiliaryServiceManager();
this.auxServiceManager.serviceInit(config);

// Tree lock
this.lockManager = new LockManager(config);

// Create and initialize Tag related modules
this.tagDispatcher = new TagEventDispatcher(eventBus, new TagManager(idGenerator, entityStore));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private ModelCatalog asModels() {

private final Config config;

@VisibleForTesting final Cache<NameIdentifier, CatalogWrapper> catalogCache;
@VisibleForTesting static Cache<NameIdentifier, CatalogWrapper> catalogCache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we make it static? I assume there will be only one CatalogManager, so there should be only one catalogCache, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we make it static?

The method check catalogInUse and metalakeInUse are all static. If we want to use cache for them, we need to change it to static

assume there will be only one CatalogManager, so there should be only one catalogCache, right?

Yes, there will be only one cache and all catalogs shares the same instance, It's not a big problem I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should named CATALOG_CACHE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be final?


private final EntityStore store;

Expand All @@ -281,7 +281,7 @@ public CatalogManager(Config config, EntityStore store, IdGenerator idGenerator)
this.idGenerator = idGenerator;

long cacheEvictionIntervalInMs = config.get(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS);
this.catalogCache =
catalogCache =
Caffeine.newBuilder()
.expireAfterAccess(cacheEvictionIntervalInMs, TimeUnit.MILLISECONDS)
.removalListener(
Expand Down Expand Up @@ -848,12 +848,17 @@ private static boolean catalogInUse(EntityStore store, NameIdentifier ident)

private static boolean getCatalogInUseValue(EntityStore store, NameIdentifier catalogIdent) {
try {
CatalogEntity catalogEntity =
store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class);
CatalogWrapper wrapper = catalogCache.getIfPresent(catalogIdent);
CatalogEntity catalogEntity;
if (wrapper != null) {
catalogEntity = wrapper.catalog.entity();
} else {
catalogEntity = store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we put this catalogEntity in cache? Besides, can we use loadcatalog directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading catalogEntity and then transform it to standard CatalogEntity are NOT static method, so I omit it. in fact, in most cases, we can get catalogEntity from the cache as all calls are from catalog operations, and the catalog should be in the cache except the first time.

}

return (boolean)
BASIC_CATALOG_PROPERTIES_METADATA.getOrDefault(
catalogEntity.getProperties(), PROPERTY_IN_USE);

} catch (NoSuchEntityException e) {
LOG.warn("Catalog {} does not exist", catalogIdent, e);
throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, catalogIdent);
Expand Down
140 changes: 92 additions & 48 deletions core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
Expand Down Expand Up @@ -64,6 +71,21 @@ public class MetalakeManager implements MetalakeDispatcher {

private final IdGenerator idGenerator;

@VisibleForTesting
static final Cache<NameIdentifier, BaseMetalake> METALAKE_CACHE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If our goal is to accelerate the acquisition of in-use, it seems that we only need to cache the corresponding in-use value, and do not need to cache BaseMetalake.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cacheing BaseMetalake will take only a little memory and can use it when loadingMetalake by the way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think caching metalake is better, because the amount of metalake is quite limited, with small memory size we can improve the performance a lot, it is worthy to cache the metalake.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use uppercase for this variable? Typically, we only use uppercase letter for final variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It indeed has final flag, see static final Cache<NameIdentifier, BaseMetalake> METALAKE_CACHE

Caffeine.newBuilder()
.expireAfterAccess(24, TimeUnit.HOURS)
.removalListener((k, v, c) -> LOG.info("Closing metalake {}.", k))
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("metalake-cleaner-%d")
.build())))
.build();

/**
* Constructs a MetalakeManager instance.
*
Expand All @@ -73,6 +95,13 @@ public class MetalakeManager implements MetalakeDispatcher {
public MetalakeManager(EntityStore store, IdGenerator idGenerator) {
this.store = store;
this.idGenerator = idGenerator;

// pre-load all metalakes and put them into cache, this is useful when user load schema/table
// directly without list/get metalake first.
BaseMetalake[] metalakes = listMetalakes();
for (BaseMetalake metalake : metalakes) {
METALAKE_CACHE.put(metalake.nameIdentifier(), metalake);
}
}

/**
Expand Down Expand Up @@ -103,10 +132,12 @@ public static void checkMetalake(NameIdentifier ident, EntityStore store)
public static boolean metalakeInUse(EntityStore store, NameIdentifier ident)
throws NoSuchMetalakeException {
try {
BaseMetalake metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class);
BaseMetalake metalake = METALAKE_CACHE.getIfPresent(ident);
if (metalake == null) {
metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class);
}
Comment on lines +135 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not cache the result after getting it from the store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #6569 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the user alters the metalake, then load schema/table directly without list/get the metalake, the cache never be hitting, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. alter a metalake is not operated frequently, so I guess it's acceptable

return (boolean)
metalake.propertiesMetadata().getOrDefault(metalake.properties(), PROPERTY_IN_USE);

} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
Expand Down Expand Up @@ -149,20 +180,24 @@ public BaseMetalake[] listMetalakes() {
*/
@Override
public BaseMetalake loadMetalake(NameIdentifier ident) throws NoSuchMetalakeException {
try {
BaseMetalake baseMetalake =
TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() -> store.get(ident, EntityType.METALAKE, BaseMetalake.class));
return newMetalakeWithResolvedProperties(baseMetalake);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
} catch (IOException ioe) {
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe);
throw new RuntimeException(ioe);
}
return METALAKE_CACHE.get(
ident,
k -> {
try {
BaseMetalake baseMetalake =
TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() -> store.get(ident, EntityType.METALAKE, BaseMetalake.class));
return newMetalakeWithResolvedProperties(baseMetalake);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
} catch (IOException ioe) {
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe);
throw new RuntimeException(ioe);
}
});
}

private BaseMetalake newMetalakeWithResolvedProperties(BaseMetalake metalakeEntity) {
Expand Down Expand Up @@ -222,6 +257,7 @@ public BaseMetalake createMetalake(
() -> {
try {
store.put(metalake, false /* overwritten */);
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(metalake));
return metalake;
} catch (EntityAlreadyExistsException | AlreadyExistsException e) {
LOG.warn("Metalake {} already exists", ident, e);
Expand Down Expand Up @@ -253,22 +289,24 @@ public BaseMetalake alterMetalake(NameIdentifier ident, MetalakeChange... change
throw new MetalakeNotInUseException(
"Metalake %s is not in use, please enable it first", ident);
}

return store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);
Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
builder = updateEntity(builder, newProps, changes);

return builder.build();
});

METALAKE_CACHE.invalidate(ident);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the result be cached after updating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put it back to cache seems to be an optional if this optional is not frequently called. Anyway, this is an improvement, let me check if we can add it back.

BaseMetalake baseMetalake =
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);
Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
builder = updateEntity(builder, newProps, changes);

return builder.build();
});
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake));
return baseMetalake;
} catch (NoSuchEntityException ne) {
LOG.warn("Metalake {} does not exist", ident, ne);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
Expand Down Expand Up @@ -305,6 +343,8 @@ public boolean dropMetalake(NameIdentifier ident, boolean force)
"Metalake %s is in use, please disable it first or use force option", ident);
}

METALAKE_CACHE.invalidate(ident);

List<CatalogEntity> catalogEntities =
store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG);
if (!catalogEntities.isEmpty() && !force) {
Expand All @@ -331,22 +371,25 @@ public void enableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
try {
boolean inUse = metalakeInUse(store, ident);
if (!inUse) {
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "true");
builder.withProperties(newProps);

return builder.build();
});
METALAKE_CACHE.invalidate(ident);
BaseMetalake baseMetalake =
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "true");
builder.withProperties(newProps);

return builder.build();
});
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake));
}

return null;
Expand All @@ -365,6 +408,7 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
try {
boolean inUse = metalakeInUse(store, ident);
if (inUse) {
METALAKE_CACHE.invalidate(ident);
store.update(
ident,
BaseMetalake.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void testCreateCatalog() {
testProperties(props, testCatalog.properties());
Assertions.assertEquals(Catalog.Type.RELATIONAL, testCatalog.type());

Assertions.assertNotNull(catalogManager.catalogCache.getIfPresent(ident));
Assertions.assertNotNull(CatalogManager.catalogCache.getIfPresent(ident));

// test before creation
NameIdentifier ident2 = NameIdentifier.of("metalake1", "test1");
Expand All @@ -265,7 +265,7 @@ public void testCreateCatalog() {
catalogManager.createCatalog(
ident2, Catalog.Type.RELATIONAL, provider, "comment", props));
Assertions.assertTrue(exception1.getMessage().contains("Metalake metalake1 does not exist"));
Assertions.assertNull(catalogManager.catalogCache.getIfPresent(ident2));
Assertions.assertNull(CatalogManager.catalogCache.getIfPresent(ident2));

// test before creation
Assertions.assertThrows(
Expand All @@ -285,7 +285,7 @@ public void testCreateCatalog() {
exception2.getMessage().contains("Catalog metalake.test1 already exists"));

// Test if the catalog is already cached
CatalogManager.CatalogWrapper cached = catalogManager.catalogCache.getIfPresent(ident);
CatalogManager.CatalogWrapper cached = CatalogManager.catalogCache.getIfPresent(ident);
Assertions.assertNotNull(cached);

// Test failed creation
Expand All @@ -300,7 +300,7 @@ public void testCreateCatalog() {
Assertions.assertTrue(
exception3.getMessage().contains("Properties are reserved and cannot be set"),
exception3.getMessage());
Assertions.assertNull(catalogManager.catalogCache.getIfPresent(failedIdent));
Assertions.assertNull(CatalogManager.catalogCache.getIfPresent(failedIdent));
// Test failed for the second time
Throwable exception4 =
Assertions.assertThrows(
Expand All @@ -311,7 +311,7 @@ public void testCreateCatalog() {
Assertions.assertTrue(
exception4.getMessage().contains("Properties are reserved and cannot be set"),
exception4.getMessage());
Assertions.assertNull(catalogManager.catalogCache.getIfPresent(failedIdent));
Assertions.assertNull(CatalogManager.catalogCache.getIfPresent(failedIdent));
}

@Test
Expand Down Expand Up @@ -394,7 +394,7 @@ public void testLoadCatalog() {
exception.getMessage().contains("Catalog metalake.test22 does not exist"));

// Load operation will cache the catalog
Assertions.assertNotNull(catalogManager.catalogCache.getIfPresent(ident));
Assertions.assertNotNull(CatalogManager.catalogCache.getIfPresent(ident));
}

@Test
Expand Down Expand Up @@ -440,8 +440,8 @@ public void testAlterCatalog() {
exception.getMessage().contains("Catalog metalake.test33 does not exist"));

// Alter operation will update the cache
Assertions.assertNull(catalogManager.catalogCache.getIfPresent(ident));
Assertions.assertNotNull(catalogManager.catalogCache.getIfPresent(ident1));
Assertions.assertNull(CatalogManager.catalogCache.getIfPresent(ident));
Assertions.assertNotNull(CatalogManager.catalogCache.getIfPresent(ident1));
}

@Test
Expand Down Expand Up @@ -469,7 +469,7 @@ public void testDropCatalog() {
Assertions.assertFalse(dropped1);

// Drop operation will update the cache
Assertions.assertNull(catalogManager.catalogCache.getIfPresent(ident));
Assertions.assertNull(CatalogManager.catalogCache.getIfPresent(ident));
}

@Test
Expand Down
Loading
Loading