Skip to content

Commit

Permalink
Caching optimization and add flag to disable Achilles cache
Browse files Browse the repository at this point in the history
fixes #2034

Set cdm.cache.cron.warming.enable = false by default
Made achilles_result_concept_count a required table.
Modified ddl population for record count table
Records are cached from achilles_result_concept_count instead of achilles_results.

Co-authored-by: Chris Knoll <[email protected]>
  • Loading branch information
anthonysena and chrisknoll authored Feb 7, 2023
1 parent 00cfbfa commit 8adbdd3
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 692 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@

<!-- Cache Config -->
<cdm.result.cache.warming.enable>true</cdm.result.cache.warming.enable>
<cdm.cache.achilles.warming.enable>false</cdm.cache.achilles.warming.enable>
<!-- Enable or disable warming cdm cache by cron -->
<cdm.cache.cron.warming.enable>true</cdm.cache.cron.warming.enable>
<cdm.cache.cron.warming.enable>false</cdm.cache.cron.warming.enable>
<!-- cron expression to warm cdm cache -->
<!-- cron expression format (asterisk means 'every' - '*' in seconds means 'every second') -->
<!-- default value is '0 0 2 * * *' which means "at 2am every day" -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package org.ohdsi.webapi.cdmresults.service;

import java.util.ArrayList;
import java.util.Arrays;
import org.ohdsi.webapi.cdmresults.domain.CDMCacheEntity;
import org.ohdsi.webapi.cdmresults.repository.CDMCacheRepository;
import org.ohdsi.webapi.source.Source;
import org.springframework.data.jpa.repository.support.SimpleJpaRepository;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.persistence.EntityManager;
import java.util.List;
import java.util.Optional;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

@Service
Expand All @@ -30,28 +31,35 @@ public List<CDMCacheEntity> save(Source source, List<CDMCacheEntity> entities) {
List<Integer> conceptIds = entities.stream()
.map(CDMCacheEntity::getConceptId)
.collect(Collectors.toList());
List<CDMCacheEntity> cacheEntities = cdmCacheRepository.findBySourceAndConceptIds(source.getSourceId(), conceptIds);
Map<Integer,CDMCacheEntity> cacheEntities = cdmCacheRepository.findBySourceAndConceptIds(source.getSourceId(), conceptIds)
.stream()
.collect(Collectors.toMap(CDMCacheEntity::getConceptId, Function.identity()));
List<CDMCacheEntity> modified = new ArrayList<>();
entities.forEach(entity -> {
// check if the entity with given cache name already exists
Optional<CDMCacheEntity> cacheEntity = cacheEntities.stream()
.filter(ce -> ce.getConceptId() == entity.getConceptId())
.findAny();
CDMCacheEntity processedEntity;
if (cacheEntity.isPresent()) {
processedEntity = cacheEntity.get();
if (cacheEntities.containsKey(entity.getConceptId())) {
processedEntity = cacheEntities.get(entity.getConceptId());
if (Arrays.equals(new long[] { entity.getPersonCount(),entity.getDescendantPersonCount(),entity.getRecordCount(),entity.getDescendantRecordCount()},
new long[] {processedEntity.getPersonCount(),processedEntity.getDescendantPersonCount(),processedEntity.getRecordCount(),processedEntity.getDescendantRecordCount()})) {
return; // data hasn't changed, so move to next in forEach
}
} else {
// if cache entity does not exist - create new one
processedEntity = new CDMCacheEntity();
processedEntity.setConceptId(entity.getConceptId());
processedEntity.setSourceId(source.getSourceId());
cacheEntities.add(processedEntity);
cacheEntities.put(processedEntity.getConceptId(), processedEntity);
}
processedEntity.setPersonCount(entity.getPersonCount());
processedEntity.setDescendantPersonCount(entity.getDescendantPersonCount());
processedEntity.setRecordCount(entity.getRecordCount());
processedEntity.setDescendantRecordCount(entity.getDescendantRecordCount());
modified.add(processedEntity);
});
cdmCacheRepository.save(cacheEntities);
return cacheEntities;
if (!modified.isEmpty()) {
cdmCacheRepository.save(modified);
}
return new ArrayList<>( cacheEntities.values());
}
}
357 changes: 185 additions & 172 deletions src/main/java/org/ohdsi/webapi/cdmresults/service/CDMCacheService.java

Large diffs are not rendered by default.

198 changes: 93 additions & 105 deletions src/main/java/org/ohdsi/webapi/service/CDMResultsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public class CDMResultsService extends AbstractDaoService implements Initializin

@Value("${cdm.cache.cron.warming.enable}")
private boolean cdmCacheCronWarmingEnable;

@Value("${cdm.cache.achilles.warming.enable}")
private boolean cdmAchillesCacheWarmingEnable;

@Value("${cache.achilles.usePersonCount:false}")
private boolean usePersonCount;
Expand Down Expand Up @@ -144,24 +147,6 @@ public void scheduledWarmCaches(){
}
}

private void warmCaches(){
Collection<Source> sources = sourceService.getSources();
warmCaches(sources);

if (logger.isInfoEnabled()) {
List<String> sourceNames = sources
.stream()
.filter(s -> !SourceUtils.hasSourceDaimon(s, SourceDaimon.DaimonType.Vocabulary)
|| !SourceUtils.hasSourceDaimon(s, SourceDaimon.DaimonType.Results))
.map(Source::getSourceName)
.collect(Collectors.toList());
if (!sourceNames.isEmpty()) {
logger.info("Following sources do not have Vocabulary or Result schema and will not be cached: {}",
sourceNames.stream().collect(Collectors.joining(", ")));
}
}
}

/**
* Get the record count and descendant record count for one or more concepts in a single CDM database
*
Expand Down Expand Up @@ -298,21 +283,6 @@ public JobExecutionResource refreshCache(@PathParam("sourceKey") final String so
return new JobExecutionResource();
}

/*
@GET
@Path("{sourceKey}/refreshCacheUnsecured")
@Produces(MediaType.APPLICATION_JSON)
public JobExecutionResource refreshCacheUnsecured(@PathParam("sourceKey") final String sourceKey) {
Source source = getSourceRepository().findBySourceKey(sourceKey);
JobExecutionResource jobExecutionResource = jobService.findJobByName(Constants.WARM_CACHE, getWarmCacheJobName(String.valueOf(source.getSourceId()),sourceKey));
if (jobExecutionResource == null) {
if (source.getDaimons().stream().anyMatch(sd -> Objects.equals(sd.getDaimonType(), SourceDaimon.DaimonType.Results))) {
return warmCacheByKey(source.getSourceKey());
}
}
return new JobExecutionResource();
}
*/
/**
* Queries for data density report for the given sourceKey
*
Expand Down Expand Up @@ -431,6 +401,27 @@ private JobExecutionResource warmCacheByKey(String sourceKey) {
}
}

private void warmCaches(){
Collection<Source> sources = sourceService.getSources();
warmCaches(sources);

if (logger.isInfoEnabled()) {
List<String> sourceNames = sources
.stream()
.filter(s -> !SourceUtils.hasSourceDaimon(s, SourceDaimon.DaimonType.Vocabulary)
|| !SourceUtils.hasSourceDaimon(s, SourceDaimon.DaimonType.Results))
.map(Source::getSourceName)
.collect(Collectors.toList());
if (!sourceNames.isEmpty()) {
logger.info("Following sources do not have Vocabulary or Result schema and will not be cached: {}",
sourceNames.stream().collect(Collectors.joining(", ")));
}
}
}

/*
* Warm cache for a single source
*/
private JobExecutionResource warmCaches(Source source) {

if (!cdmResultCacheWarmingEnable) {
Expand All @@ -442,23 +433,23 @@ private JobExecutionResource warmCaches(Source source) {
return new JobExecutionResource();
}

String jobName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
Step resultsCacheStep = getCountStep(source, jobName);
Step achillesCacheStep = getAchillesStep(source, jobName);
SimpleJobBuilder builder = jobBuilders.get(jobName)
.start(achillesCacheStep);

/*
* Only run the results cache step if the results source has a
* priority >= 1
*/
if (getResultsDaimonPriority(source) > 0) {
builder = builder.next(resultsCacheStep);
int resultDaimonPriority = getResultsDaimonPriority(source);
if (!cdmAchillesCacheWarmingEnable && resultDaimonPriority <= 0) {
logger.info("Cache wouldn't be applied to sources with result daimon priority <= 0 AND when the Achilles cache is disabled, source [{}] was omitted", source.getSourceName());
return new JobExecutionResource();
}

return createJob(source.getSourceKey(), source.getSourceId(), jobName, builder);

String jobName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
List<Step> jobSteps = createCacheWarmingJobSteps(source, jobName);
SimpleJobBuilder builder = createJob(String.valueOf(source.getSourceId()),
source.getSourceKey(),
jobSteps);
return runJob(source.getSourceKey(), source.getSourceId(), jobName, builder);
}

/*
* Warm cache for a collection of sources
*/
private void warmCaches(Collection<Source> sources) {

if (!cdmResultCacheWarmingEnable) {
Expand All @@ -475,101 +466,65 @@ private void warmCaches(Collection<Source> sources) {
int bucketIndex = 0, counter = 0;
List<Integer> sourceIds = new ArrayList<>();
List<String> sourceKeys = new ArrayList<>();
List<Step> jobSteps = new ArrayList<>();
List<Step> allJobSteps = new ArrayList<>();
for (Source source : vocabularySources) {
sourceIds.add(source.getSourceId());
sourceKeys.add(source.getSourceKey());
String jobStepName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
String jobName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
// Check whether cache job for current source already exists
if (jobService.findJobByName(jobStepName, jobStepName) == null) {
if (jobService.findJobByName(jobName, jobName) == null) {
// Create the job step
Step jobStep = getJobStep(source, jobStepName);
List<Step> jobSteps = createCacheWarmingJobSteps(source, jobName);

// get priority of the results daimon
int priority = getResultsDaimonPriority(source);
// if source has results daimon with high priority - put it at the beginning of the queue
if (priority > 0) {
jobSteps.add(0, jobStep);
allJobSteps.addAll(0, jobSteps);
} else {
jobSteps.add(jobStep);
allJobSteps.addAll(jobSteps);
}
}

if (counter++ >= bucketSizes[bucketIndex] - 1) {
createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(",")),
String.join(",", sourceKeys),
jobSteps);
if (!allJobSteps.isEmpty()) {
SimpleJobBuilder builder = createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(",")),
String.join(",", sourceKeys),
allJobSteps);
runJob(source.getSourceKey(), source.getSourceId(), jobName, builder);
}

bucketIndex++;
counter = 0;
sourceIds.clear();
sourceKeys.clear();
jobSteps.clear();
allJobSteps.clear();
}
}
}

private Step getJobStep(Source source, String jobStepName) {
int resultDaimonPriority = getResultsDaimonPriority(source);
SimpleJob job = new SimpleJob(jobStepName);
job.setJobRepository(jobRepository);

job.addStep(getAchillesStep(source, jobStepName));
if (resultDaimonPriority > 0) {
job.addStep(getCountStep(source, jobStepName));
}

return stepBuilderFactory.get(jobStepName)
.job(job)
.parametersExtractor((job1, stepExecution) -> new JobParametersBuilder()
.addString(Constants.Params.JOB_NAME, jobStepName)
.addString(Constants.Params.SOURCE_KEY, source.getSourceKey())
.addString(Constants.Params.SOURCE_ID, String.valueOf(source.getSourceId()))
.addString(Constants.Params.JOB_AUTHOR, security.getSubject())
.addLong(JOB_START_TIME, System.currentTimeMillis())
.toJobParameters())
.build();
}

private void createJob(String sourceIds, String sourceKeys, List<Step> steps) {
private SimpleJobBuilder createJob(String sourceIds, String sourceKeys, List<Step> steps) {
final SimpleJobBuilder[] stepBuilder = {null};
String jobName = getWarmCacheJobName(sourceIds, sourceKeys);
if (jobService.findJobByName(jobName, jobName) == null && steps.size() > 0) {
if (jobService.findJobByName(jobName, jobName) == null && !steps.isEmpty()) {
JobBuilder jobBuilder = jobBuilders.get(jobName);

final SimpleJobBuilder[] stepBuilder = {null};

steps.forEach(step -> {
if (stepBuilder[0] != null) {
stepBuilder[0].next(step);
} else {
stepBuilder[0] = jobBuilder.start(step);
}
});

if (stepBuilder[0] != null) {
createJob(sourceKeys, -1, jobName, stepBuilder[0]);
}
}
}

private long[] getBucketSizes(List<Source> vocabularySources) {
int jobCount = cacheJobsCount;
long bucketSize, size = vocabularySources.size();
long[] bucketSizes = new long[cacheJobsCount];
// Get sizes of all buckets so that their values are approximately equal
while (jobCount > 0) {
if (jobCount > 1) {
bucketSize = Math.round(Math.floor(size * 1.0 / jobCount));
} else {
bucketSize = size;
}
bucketSizes[cacheJobsCount - jobCount] = bucketSize;
jobCount--;
size -= bucketSize;
}
return bucketSizes;
return stepBuilder[0];
}

private JobExecutionResource createJob(String sourceKey, int sourceId, String jobName, SimpleJobBuilder stepBuilder) {
/*
* Runs the job and returns the JobExecutionResource
*/
private JobExecutionResource runJob(String sourceKey, int sourceId, String jobName, SimpleJobBuilder stepBuilder) {
return jobService.runJob(stepBuilder.build(), new JobParametersBuilder()
.addString(Constants.Params.JOB_NAME, jobName)

Expand All @@ -579,6 +534,21 @@ private JobExecutionResource createJob(String sourceKey, int sourceId, String jo
.addString(Constants.Params.SOURCE_ID, String.valueOf(sourceId))
.toJobParameters());
}

private List<Step> createCacheWarmingJobSteps(Source source, String jobName) {
int resultDaimonPriority = getResultsDaimonPriority(source);
SimpleJob job = new SimpleJob(jobName);
job.setJobRepository(jobRepository);
List<Step> steps = new ArrayList<>();

if (cdmAchillesCacheWarmingEnable) {
steps.add(getAchillesStep(source, jobName));
}
if (resultDaimonPriority > 0) {
steps.add(getCountStep(source, jobName));
}
return steps;
}

private Step getAchillesStep(Source source, String jobStepName) {
CDMResultsService instance = applicationContext.getBean(CDMResultsService.class);
Expand Down Expand Up @@ -620,4 +590,22 @@ private String getWarmCacheJobName(String sourceIds, String sourceKeys) {
}
return jobName;
}

private long[] getBucketSizes(List<Source> vocabularySources) {
int jobCount = cacheJobsCount;
long bucketSize, size = vocabularySources.size();
long[] bucketSizes = new long[cacheJobsCount];
// Get sizes of all buckets so that their values are approximately equal
while (jobCount > 0) {
if (jobCount > 1) {
bucketSize = Math.round(Math.floor(size * 1.0 / jobCount));
} else {
bucketSize = size;
}
bucketSizes[cacheJobsCount - jobCount] = bucketSize;
jobCount--;
size -= bucketSize;
}
return bucketSizes;
}
}
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ analysis.result.zipVolumeSizeMb=${analysis.result.zipVolumeSizeMb}

#Cache Config
cdm.result.cache.warming.enable=${cdm.result.cache.warming.enable}
cdm.cache.achilles.warming.enable=${cdm.cache.achilles.warming.enable}
cdm.cache.cron.warming.enable=${cdm.cache.cron.warming.enable}
cdm.cache.cron.expression=${cdm.cache.cron.expression}

Expand Down
Loading

0 comments on commit 8adbdd3

Please sign in to comment.