-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cache): add hazelcast distributed cache option #6645
Changes from 6 commits
1fab4e7
8b8bb23
d082bb0
988a71b
dd42b7c
ddf9f0c
c06ea1e
f5bdfe5
e2bdcde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,11 @@ | ||
package com.linkedin.metadata.search.cache; | ||
|
||
import com.linkedin.metadata.graph.EntityLineageResult; | ||
import java.io.Serializable; | ||
import lombok.Data; | ||
|
||
|
||
@Data | ||
public class CachedEntityLineageResult { | ||
private final EntityLineageResult entityLineageResult; | ||
public class CachedEntityLineageResult implements Serializable { | ||
private final byte[] entityLineageResult; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: Ideally this detail would be hidden to consumer of the CachedEntityLineageResult. We should auto deserialize it on "getEntityLineageResult" |
||
private final long timestamp; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package com.linkedin.metadata.search.utils; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.zip.GZIPInputStream; | ||
import java.util.zip.GZIPOutputStream; | ||
|
||
|
||
public class GZIPUtil { | ||
private GZIPUtil() { } | ||
|
||
public static String gzipDecompress(byte[] gzipped) { | ||
String unzipped; | ||
try (ByteArrayInputStream bis = new ByteArrayInputStream(gzipped)) { | ||
GZIPInputStream gis = new GZIPInputStream(bis); | ||
ByteArrayOutputStream bos = new ByteArrayOutputStream(); | ||
byte[] buffer = new byte[1024]; | ||
int len; | ||
while ((len = gis.read(buffer)) != -1) { | ||
bos.write(buffer, 0, len); | ||
} | ||
unzipped = bos.toString(StandardCharsets.UTF_8); | ||
gis.close(); | ||
bos.close(); | ||
} catch (IOException ie) { | ||
throw new IllegalStateException("Error while unzipping value.", ie); | ||
} | ||
return unzipped; | ||
} | ||
|
||
public static byte[] gzipCompress(String unzipped) { | ||
byte[] gzipped; | ||
try (ByteArrayInputStream bis = new ByteArrayInputStream(unzipped.getBytes(StandardCharsets.UTF_8))) { | ||
ByteArrayOutputStream bos = new ByteArrayOutputStream(); | ||
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bos); | ||
byte[] buffer = new byte[1024]; | ||
int len; | ||
while ((len = bis.read(buffer)) != -1) { | ||
gzipOutputStream.write(buffer, 0, len); | ||
} | ||
gzipOutputStream.finish(); | ||
gzipped = bos.toByteArray(); | ||
gzipOutputStream.close(); | ||
bos.close(); | ||
} catch (IOException ie) { | ||
throw new IllegalStateException("Error while gzipping value: " + unzipped); | ||
} | ||
return gzipped; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package com.linkedin.gms.factory.common; | ||
|
||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.hazelcast.config.Config; | ||
import com.hazelcast.config.EvictionConfig; | ||
import com.hazelcast.config.EvictionPolicy; | ||
import com.hazelcast.config.MapConfig; | ||
import com.hazelcast.config.MaxSizePolicy; | ||
import com.hazelcast.core.Hazelcast; | ||
import com.hazelcast.core.HazelcastInstance; | ||
import com.hazelcast.spring.cache.HazelcastCacheManager; | ||
import java.util.concurrent.TimeUnit; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||
import org.springframework.cache.CacheManager; | ||
import org.springframework.cache.caffeine.CaffeineCacheManager; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
|
||
@Configuration | ||
public class CacheConfig { | ||
|
||
@Value("${CACHE_TTL_SECONDS:600}") | ||
private int cacheTtlSeconds; | ||
|
||
@Value("${CACHE_MAX_SIZE:10000}") | ||
private int cacheMaxSize; | ||
|
||
@Value("${searchService.cache.hazelcast.serviceName:hazelcast-service}") | ||
private String hazelcastServiceName; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we have hazelcast in the name of variables here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i see - so we are only accounting for 2 cases: caffeine and hazel. i think i get it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spring Cache Manager is intended for very very simple use cases when switching out cache implementations. Unfortunately our use cases are complex enough where it requires using the provider level interfaces so we would need to reimplement any other implementations we support. This is unchanged from how it previously was, but just exposes the implementation name in the config. It's possible that other providers would also require a headless K8s service for their distributed cache implementation, but unlikely (if we even do implement another underlying supported cache) so I think it's okay to be specific here. |
||
|
||
@Bean | ||
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "caffeine") | ||
public CacheManager caffeineCacheManager() { | ||
CaffeineCacheManager cacheManager = new CaffeineCacheManager(); | ||
cacheManager.setCaffeine(caffeineCacheBuilder()); | ||
return cacheManager; | ||
} | ||
|
||
private Caffeine<Object, Object> caffeineCacheBuilder() { | ||
return Caffeine.newBuilder() | ||
.initialCapacity(100) | ||
.maximumSize(cacheMaxSize) | ||
.expireAfterAccess(cacheTtlSeconds, TimeUnit.SECONDS) | ||
.recordStats(); | ||
} | ||
|
||
@Bean | ||
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "hazelcast") | ||
public CacheManager hazelcastCacheManager() { | ||
Config config = new Config(); | ||
// TODO: This setting is equivalent to expireAfterAccess, refreshes timer after a get, put, containsKey etc. | ||
// is this behavior what we actually desire? Should we change it now? | ||
MapConfig mapConfig = new MapConfig().setMaxIdleSeconds(cacheTtlSeconds); | ||
|
||
EvictionConfig evictionConfig = new EvictionConfig() | ||
.setMaxSizePolicy(MaxSizePolicy.PER_NODE) | ||
.setSize(cacheMaxSize) | ||
.setEvictionPolicy(EvictionPolicy.LFU); | ||
mapConfig.setEvictionConfig(evictionConfig); | ||
mapConfig.setName("default"); | ||
config.addMapConfig(mapConfig); | ||
|
||
config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); | ||
config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true) | ||
.setProperty("service-dns", hazelcastServiceName); | ||
|
||
|
||
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config); | ||
|
||
return new HazelcastCacheManager(hazelcastInstance); | ||
} | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hazelcast requires that the cached object is a string? Or serializable? Cannot record template be serialized by itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requires that it be serializable, RecordTemplate does not implement Serializable and any RecordTemplate being used in the key or value was throwing errors. Looked into trying to inject a custom deserializer into the Hazelcast deserialization config, but this was much easier.