Skip to content

Commit

Permalink
feat(cache): add hazelcast distributed cache option (datahub-project#…
Browse files Browse the repository at this point in the history
…6645)

Co-authored-by: Aseem Bansal <[email protected]>
  • Loading branch information
2 people authored and Eric Yomi committed Feb 8, 2023
1 parent 860fee2 commit 81db349
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 73 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ project.ext.externalDependency = [
'hadoopCommon':'org.apache.hadoop:hadoop-common:2.7.2',
'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2',
'hadoopCommon3':'org.apache.hadoop:hadoop-common:3.3.4',
'hazelcast':'com.hazelcast:hazelcast:5.2.1',
'hazelcastSpring':'com.hazelcast:hazelcast-spring:5.2.1',
'hazelcastTest':'com.hazelcast:hazelcast:5.2.1:tests',
'hibernateCore': 'org.hibernate:hibernate-core:5.2.16.Final',
'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9',
'httpAsyncClient': 'org.apache.httpcomponents:httpasyncclient:4.1.5',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.javatuples.Triplet;
import org.springframework.cache.Cache;

import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;


@RequiredArgsConstructor
@Slf4j
Expand Down Expand Up @@ -81,14 +84,15 @@ public LineageSearchResult searchAcrossLineage(@Nonnull Urn sourceUrn, @Nonnull
@Nonnull List<String> entities, @Nullable String input, @Nullable Integer maxHops, @Nullable Filter inputFilters,
@Nullable SortCriterion sortCriterion, int from, int size) {
// Cache multihop result for faster performance
Triplet<String, LineageDirection, Integer> cacheKey = Triplet.with(sourceUrn.toString(), direction, maxHops);
CachedEntityLineageResult cachedLineageResult = cacheEnabled
? cache.get(Pair.of(sourceUrn, direction), CachedEntityLineageResult.class) : null;
? cache.get(cacheKey, CachedEntityLineageResult.class) : null;
EntityLineageResult lineageResult;
if (cachedLineageResult == null) {
maxHops = maxHops != null ? maxHops : 1000;
lineageResult = _graphService.getLineage(sourceUrn, direction, 0, MAX_RELATIONSHIPS, maxHops);
if (cacheEnabled) {
cache.put(Pair.of(sourceUrn, direction), new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
cache.put(cacheKey, new CachedEntityLineageResult(lineageResult, System.currentTimeMillis()));
}
} else {
lineageResult = cachedLineageResult.getEntityLineageResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand All @@ -15,6 +16,8 @@
import lombok.Value;
import org.springframework.cache.Cache;

import static com.datahub.util.RecordUtils.*;


/**
* Wrapper class to allow searching in batches and caching the results.
Expand All @@ -33,7 +36,7 @@ public class CacheableSearcher<K> {
private final boolean enableCache;

@Value
public static class QueryPagination {
public static class QueryPagination implements Serializable {
int from;
int size;
}
Expand Down Expand Up @@ -88,16 +91,19 @@ private SearchResult getBatch(int batchId) {
QueryPagination batch = getBatchQuerySize(batchId);
SearchResult result;
if (enableCache()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
result = cache.get(cacheKey, SearchResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getBatch_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "getBatch_cache_access").time();
K cacheKey = cacheKeyGenerator.apply(batch);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(SearchResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "getBatch_cache_miss").time();
result = searcher.apply(batch);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "getBatch_cache_miss_count").inc();
}
}
} else {
result = searcher.apply(batch);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
package com.linkedin.metadata.search.cache;

import com.linkedin.metadata.graph.EntityLineageResult;
import java.io.Serializable;
import lombok.Data;

import static com.datahub.util.RecordUtils.*;
import static com.linkedin.metadata.search.utils.GZIPUtil.*;


@Data
public class CachedEntityLineageResult {
private final EntityLineageResult entityLineageResult;
public class CachedEntityLineageResult implements Serializable {
private final byte[] entityLineageResult;
private final long timestamp;

public CachedEntityLineageResult(EntityLineageResult lineageResult, long timestamp) {
this.entityLineageResult = gzipCompress(toJsonString(lineageResult));
this.timestamp = timestamp;
}

public EntityLineageResult getEntityLineageResult() {
return toRecordTemplate(EntityLineageResult.class, gzipDecompress(entityLineageResult));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.javatuples.Quintet;
import org.springframework.cache.CacheManager;

import static com.datahub.util.RecordUtils.*;


@RequiredArgsConstructor
public class CachingAllEntitiesSearchAggregator {
Expand All @@ -27,6 +29,8 @@ public SearchResult getSearchResults(List<String> entities, @Nonnull String inpu
return new CacheableSearcher<>(cacheManager.getCache(ALL_ENTITIES_SEARCH_AGGREGATOR_CACHE_NAME), batchSize,
querySize -> aggregator.search(entities, input, postFilters, sortCriterion, querySize.getFrom(),
querySize.getSize(), searchFlags),
querySize -> Quintet.with(entities, input, postFilters, sortCriterion, querySize), searchFlags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entities, input, postFilters != null ? toJsonString(postFilters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), searchFlags, enableCache)
.getSearchResults(from, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;

import static com.datahub.util.RecordUtils.*;


@RequiredArgsConstructor
public class CachingEntitySearchService {
Expand Down Expand Up @@ -115,7 +117,8 @@ public SearchResult getCachedSearchResults(
cacheManager.getCache(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME),
batchSize,
querySize -> getRawSearchResults(entityName, query, filters, sortCriterion, querySize.getFrom(), querySize.getSize()),
querySize -> Quintet.with(entityName, query, filters, sortCriterion, querySize), flags, enableCache).getSearchResults(from, size);
querySize -> Quintet.with(entityName, query, filters != null ? toJsonString(filters) : null,
sortCriterion != null ? toJsonString(sortCriterion) : null, querySize), flags, enableCache).getSearchResults(from, size);
}


Expand All @@ -133,16 +136,19 @@ public AutoCompleteResult getCachedAutoCompleteResults(
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME);
AutoCompleteResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters, limit);
result = cache.get(cacheKey, AutoCompleteResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedAutoCompleteResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "autocomplete_cache_access").time();
Object cacheKey = Quintet.with(entityName, input, field, filters != null ? toJsonString(filters) : null, limit);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(AutoCompleteResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "autocomplete_cache_miss").time();
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "autocomplete_cache_miss_count").inc();
}
}
} else {
result = getRawAutoCompleteResults(entityName, input, field, filters, limit);
Expand All @@ -165,16 +171,19 @@ public BrowseResult getCachedBrowseResults(
Cache cache = cacheManager.getCache(ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME);
BrowseResult result;
if (enableCache(flags)) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters, from, size);
result = cache.get(cacheKey, BrowseResult.class);
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, result);
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
try (Timer.Context ignored2 = MetricUtils.timer(this.getClass(), "getCachedBrowseResults_cache").time()) {
Timer.Context cacheAccess = MetricUtils.timer(this.getClass(), "browse_cache_access").time();
Object cacheKey = Quintet.with(entityName, path, filters != null ? toJsonString(filters) : null, from, size);
String json = cache.get(cacheKey, String.class);
result = json != null ? toRecordTemplate(BrowseResult.class, json) : null;
cacheAccess.stop();
if (result == null) {
Timer.Context cacheMiss = MetricUtils.timer(this.getClass(), "browse_cache_miss").time();
result = getRawBrowseResults(entityName, path, filters, from, size);
cache.put(cacheKey, toJsonString(result));
cacheMiss.stop();
MetricUtils.counter(this.getClass(), "browse_cache_miss_count").inc();
}
}
} else {
result = getRawBrowseResults(entityName, path, filters, from, size);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.linkedin.metadata.search.utils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;


public class GZIPUtil {
private GZIPUtil() { }

public static String gzipDecompress(byte[] gzipped) {
String unzipped;
try (ByteArrayInputStream bis = new ByteArrayInputStream(gzipped);
GZIPInputStream gis = new GZIPInputStream(bis);
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int len;
while ((len = gis.read(buffer)) != -1) {
bos.write(buffer, 0, len);
}
unzipped = bos.toString(StandardCharsets.UTF_8);
} catch (IOException ie) {
throw new IllegalStateException("Error while unzipping value.", ie);
}
return unzipped;
}

public static byte[] gzipCompress(String unzipped) {
byte[] gzipped;
try (ByteArrayInputStream bis = new ByteArrayInputStream(unzipped.getBytes(StandardCharsets.UTF_8));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bos)) {
byte[] buffer = new byte[1024];
int len;
while ((len = bis.read(buffer)) != -1) {
gzipOutputStream.write(buffer, 0, len);
}
gzipOutputStream.finish();
gzipped = bos.toByteArray();
} catch (IOException ie) {
throw new IllegalStateException("Error while gzipping value: " + unzipped);
}
return gzipped;
}
}
3 changes: 3 additions & 0 deletions metadata-service/factories/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies {
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.gson
compile externalDependency.hazelcast
compile externalDependency.hazelcastSpring
compile externalDependency.kafkaClients
compile externalDependency.kafkaAvroSerde
compileOnly externalDependency.lombok
Expand All @@ -39,6 +41,7 @@ dependencies {

testCompile externalDependency.mockito
testCompile externalDependency.testng
testCompile externalDependency.hazelcastTest

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.gms.factory.common;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.hazelcast.config.Config;
import com.hazelcast.config.EvictionConfig;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizePolicy;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.spring.cache.HazelcastCacheManager;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class CacheConfig {

@Value("${CACHE_TTL_SECONDS:600}")
private int cacheTtlSeconds;

@Value("${CACHE_MAX_SIZE:10000}")
private int cacheMaxSize;

@Value("${searchService.cache.hazelcast.serviceName:hazelcast-service}")
private String hazelcastServiceName;

@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "caffeine")
public CacheManager caffeineCacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(caffeineCacheBuilder());
return cacheManager;
}

private Caffeine<Object, Object> caffeineCacheBuilder() {
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(cacheMaxSize)
.expireAfterAccess(cacheTtlSeconds, TimeUnit.SECONDS)
.recordStats();
}

@Bean
@ConditionalOnProperty(name = "searchService.cacheImplementation", havingValue = "hazelcast")
public CacheManager hazelcastCacheManager() {
Config config = new Config();
// TODO: This setting is equivalent to expireAfterAccess, refreshes timer after a get, put, containsKey etc.
// is this behavior what we actually desire? Should we change it now?
MapConfig mapConfig = new MapConfig().setMaxIdleSeconds(cacheTtlSeconds);

EvictionConfig evictionConfig = new EvictionConfig()
.setMaxSizePolicy(MaxSizePolicy.PER_NODE)
.setSize(cacheMaxSize)
.setEvictionPolicy(EvictionPolicy.LFU);
mapConfig.setEvictionConfig(evictionConfig);
mapConfig.setName("default");
config.addMapConfig(mapConfig);

config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
config.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
.setProperty("service-dns", hazelcastServiceName);


HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);

return new HazelcastCacheManager(hazelcastInstance);
}
}
Loading

0 comments on commit 81db349

Please sign in to comment.