Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cache): add hazelcast distributed cache option #6645

Merged
merged 9 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ project.ext.externalDependency = [
'hadoopCommon':'org.apache.hadoop:hadoop-common:2.7.2',
'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2',
'hadoopCommon3':'org.apache.hadoop:hadoop-common:3.3.4',
'hazelcast':'com.hazelcast:hazelcast:5.2.1',
'hazelcastSpring':'com.hazelcast:hazelcast-spring:5.2.1',
'hazelcastTest':'com.hazelcast:hazelcast:5.2.1:tests',
'hibernateCore': 'org.hibernate:hibernate-core:5.2.16.Final',
'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9',
'httpAsyncClient': 'org.apache.httpcomponents:httpasyncclient:4.1.5',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.javatuples.Triplet;
import org.springframework.cache.Cache;

import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;


@RequiredArgsConstructor
@Slf4j
Expand Down Expand Up @@ -81,14 +84,15 @@ public LineageSearchResult searchAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull
@Nonnull List<String> entities, @Nullable String input, @Nullable Integer maxHops, @Nullable Filter inputFilters,
@Nullable SortCriterion sortCriterion, int from, int size) {
// Cache multihop result for faster performance
Triplet<String, LineageDirection, Integer> cacheKey = Triplet.with(sourceUrn.toString(), direction, maxHops);
CachedEntityLineageResult cachedLineageResult = cacheEnabled
? cache.get(Pair.of(sourceUrn, direction), CachedEntityLineageResult.class) : null;
? cache.get(cacheKey, CachedEntityLineageResult.class) : null;
EntityLineageResult lineageResult;
if (cachedLineageResult == null) {
maxHops = maxHops != null ? maxHops : 1000;
lineageResult = _graphService.getLineage(sourceUrn, direction, 0, MAX_RELATIONSHIPS, maxHops);
if (cacheEnabled) {
cache.put(Pair.of(sourceUrn, direction), new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
cache.put(cacheKey, new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
}
} else {
lineageResult = cachedLineageResult.getEntityLineageResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand All @@ -15,6 +16,8 @@
import lombok.Value;
import org.springframework.cache.Cache;

import static com.datahub.util.RecordUtils.*;


/**
* Wrapper class to allow searching in batches and caching the results.
Expand All @@ -33,7 +36,7 @@ public class CacheableSearcher<K> {
private final boolean enableCache;

@Value
public static class QueryPagination {
public static class QueryPagination implements Serializable {
int from;
int size;
}
Expand Down Expand Up @@ -88,16 +91,19 @@ private SearchResult getBatch(int batchId) {
QueryPagination batch = getBatchQuerySize(batchId);
SearchResult result;
if (enableCache()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
result = cache.get(cacheKey, SearchResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getBatch_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(SearchResult.class, json) : null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hazelcast requires that the cached object is a string? Or serializable? Cannot record template be serialized by itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires that it be serializable, RecordTemplate does not implement Serializable and any RecordTemplate being used in the key or value was throwing errors. Looked into trying to inject a custom deserializer into the Hazelcast deserialization config, but this was much easier.

cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
}
}
} else {
result = searcher.apply(batch);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package com.linkedin.metadata.search.cache;

import com.linkedin.metadata.graph.EntityLineageResult;
import java.io.Serializable;
import lombok.Data;

import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;


@Data
public class CachedEntityLineageResult {
private final EntityLineageResult entityLineageResult;
public class CachedEntityLineageResult implements Serializable {
private final byte[] entityLineageResult;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Ideally this detail would be hidden to consumer of the CachedEntityLineageResult. We should auto deserialize it on "getEntityLineageResult"

private final long timestamp;

public CachedEntityLineageResult(EntityLineageResult lineageResult, long timestamp) {
this.entityLineageResult = gzipCompress(toJsonString(lineageResult));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice encapsulation!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely better than having external consumers needing to know about this internal detail

this.timestamp = timestamp;
}

public EntityLineageResult getEntityLineageResult() {
return toRecordTemplate(EntityLineageResult.class, gzipDecompress(entityLineageResult));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.javatuples.Quintet;
import org.springframework.cache.CacheManager;

import static com.datahub.util.RecordUtils.*;


@RequiredArgsConstructor
public class CachingAllEntitiesSearchAggregator {
Expand All @@ -27,6 +29,8 @@ public SearchResult getSearchResults(List<String> entities, @Nonnull String inpu
return new CacheableSearcher<>(cacheManager.getCache(ALL_ENTITIES_SEARCH_AGGREGATOR_CACHE_NAME), batchSize,
querySize -> aggregator.search(entities, input, postFilters, sortCriterion, querySize.getFrom(),
querySize.getSize(), searchFlags),
querySize -> Quintet.with(entities, input, postFilters, sortCriterion, querySize), searchFlags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entities, input, postFilters != null ? toJsonString(postFilters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), searchFlags, enableCache)
.getSearchResults(from, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;

import static com.datahub.util.RecordUtils.*;


@RequiredArgsConstructor
public class CachingEntitySearchService {
Expand Down Expand Up @@ -115,7 +117,8 @@ public SearchResult getCachedSearchResults(
cacheManager.getCache(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME),
batchSize,
querySize -> getRawSearchResults(entityName, query, filters, sortCriterion, querySize.getFrom(), querySize.getSize()),
querySize -> Quintet.with(entityName, query, filters, sortCriterion, querySize), flags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entityName, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), flags, enableCache).getSearchResults(from, size);
}


Expand All @@ -133,16 +136,19 @@ public AutoCompleteResult getCachedAutoCompleteResults(
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME);
AutoCompleteResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters, limit);
result = cache.get(cacheKey, AutoCompleteResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedAutoCompleteResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters != null ? toJsonString(filters) : null, limit);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(AutoCompleteResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
}
}
} else {
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
Expand All @@ -165,16 +171,19 @@ public BrowseResult getCachedBrowseResults(
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME);
BrowseResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters, from, size);
result = cache.get(cacheKey, BrowseResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedBrowseResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters != null ? toJsonString(filters) : null, from, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(BrowseResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
}
}
} else {
result = getRawBrowseResults(entityName, path, filters, from, size);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.linkedin.metadata.search.utils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;


public class GZIPUtil {
private GZIPUtil() { }

public static String gzipDecompress(byte[] gzipped) {
String unzipped;
try (ByteArrayInputStream bis = new ByteArrayInputStream(gzipped);
GZIPInputStream gis = new GZIPInputStream(bis);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int len;
while ((len = gis.read(buffer)) != -1) {
bos.write(buffer, 0, len);
}
unzipped = bos.toString(StandardCharsets.UTF_8);
} catch (IOException ie) {
throw new IllegalStateException("Error while unzipping value.", ie);
}
return unzipped;
}

public static byte[] gzipCompress(String unzipped) {
byte[] gzipped;
try (ByteArrayInputStream bis = new ByteArrayInputStream(unzipped.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bos)) {
byte[] buffer = new byte[1024];
int len;
while ((len = bis.read(buffer)) != -1) {
gzipOutputStream.write(buffer, 0, len);
}
gzipOutputStream.finish();
gzipped = bos.toByteArray();
} catch (IOException ie) {
throw new IllegalStateException("Error while gzipping value: " + unzipped);
}
return gzipped;
}
}
3 changes: 3 additions & 0 deletions metadata-service/factories/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies {
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.gson
compile externalDependency.hazelcast
compile externalDependency.hazelcastSpring
compile externalDependency.kafkaClients
compile externalDependency.kafkaAvroSerde
compileOnly externalDependency.lombok
Expand All @@ -39,6 +41,7 @@ dependencies {

testCompile externalDependency.mockito
testCompile externalDependency.testng
testCompile externalDependency.hazelcastTest

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.gms.factory.common;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.hazelcast.config.Config;
import com.hazelcast.config.EvictionConfig;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizePolicy;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class CacheConfig {

@Value("${CACHE_TTL_SECONDS:600}")
private int cacheTtlSeconds;

@Value("${CACHE_MAX_SIZE:10000}")
private int cacheMaxSize;

@Value("${searchService.cache.hazelcast.serviceName:hazelcast-service}")
private String hazelcastServiceName;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have hazelcast in the name of variables here?
isn't the idea that we can swap in differnt impls for the caffeine cache?
does this seem overly-specific?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see - so we are only accounting for 2 cases: caffeine and hazel. i think i get it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring Cache Manager is intended for very very simple use cases when switching out cache implementations. Unfortunately our use cases are complex enough where it requires using the provider level interfaces so we would need to reimplement any other implementations we support. This is unchanged from how it previously was, but just exposes the implementation name in the config. It's possible that other providers would also require a headless K8s service for their distributed cache implementation, but unlikely (if we even do implement another underlying supported cache) so I think it's okay to be specific here.


@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "caffeine")
public CacheManager caffeineCacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(caffeineCacheBuilder());
return cacheManager;
}

private Caffeine<Object, Object> caffeineCacheBuilder() {
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(cacheMaxSize)
.expireAfterAccess(cacheTtlSeconds, TimeUnit.SECONDS)
.recordStats();
}

@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "hazelcast")
public CacheManager hazelcastCacheManager() {
Config config = new Config();
// TODO: This setting is equivalent to expireAfterAccess, refreshes timer after a get, put, containsKey etc.
// is this behavior what we actually desire? Should we change it now?
MapConfig mapConfig = new MapConfig().setMaxIdleSeconds(cacheTtlSeconds);

EvictionConfig evictionConfig = new EvictionConfig()
.setMaxSizePolicy(MaxSizePolicy.PER_NODE)
.setSize(cacheMaxSize)
.setEvictionPolicy(EvictionPolicy.LFU);
mapConfig.setEvictionConfig(evictionConfig);
mapConfig.setName("default");
config.addMapConfig(mapConfig);

config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
.setProperty("service-dns", hazelcastServiceName);


HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);

return new HazelcastCacheManager(hazelcastInstance);
}
}
Loading