Skip to content

Commit

Permalink
WebAPI v2.11 fails to start with many sources configured #2031 (#2032)
Browse files Browse the repository at this point in the history
* fails to start with many sources configured #2031

* Starting to add back the ability to control caching based on daimon priority

Co-authored-by: Anthony Sena <[email protected]>
(cherry picked from commit fad3dab)
  • Loading branch information
anton-abushkevich committed Jun 9, 2022
1 parent 5c3d899 commit 8785ab7
Showing 1 changed file with 33 additions and 19 deletions.
52 changes: 33 additions & 19 deletions src/main/java/org/ohdsi/webapi/service/CDMResultsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public class CDMResultsService extends AbstractDaoService implements Initializin

@Override
public void afterPropertiesSet() throws Exception {

queryRunner.init(this.getSourceDialect(), objectMapper);
warmCaches();
}
Expand Down Expand Up @@ -243,7 +242,7 @@ public JobExecutionResource refreshCache(@PathParam("sourceKey") final String so
if(isSecured() && isAdmin()) {
Source source = getSourceRepository().findBySourceKey(sourceKey);
if (sourceAccessor.hasAccess(source)) {
JobExecutionResource jobExecutionResource = jobService.findJobByName(Constants.WARM_CACHE, getWarmCacheJobName(sourceKey));
JobExecutionResource jobExecutionResource = jobService.findJobByName(Constants.WARM_CACHE, getWarmCacheJobName(String.valueOf(source.getSourceId()),sourceKey));
if (jobExecutionResource == null) {
if (source.getDaimons().stream().anyMatch(sd -> Objects.equals(sd.getDaimonType(), SourceDaimon.DaimonType.Results))) {
return warmCacheByKey(source.getSourceKey());
Expand Down Expand Up @@ -381,8 +380,8 @@ public JsonNode getRawDrilldown(String domain, int conceptId, String sourceKey)
}

private JobExecutionResource warmCacheByKey(String sourceKey) {
if (jobService.findJobByName(getWarmCacheJobName(sourceKey), getWarmCacheJobName(sourceKey)) == null) {
Source source = getSourceRepository().findBySourceKey(sourceKey);
Source source = getSourceRepository().findBySourceKey(sourceKey);
if (jobService.findJobByName(getWarmCacheJobName(String.valueOf(source.getSourceId()), sourceKey), getWarmCacheJobName(String.valueOf(source.getSourceId()), sourceKey)) == null) {
return warmCaches(source);
} else {
return new JobExecutionResource();
Expand All @@ -399,14 +398,21 @@ private JobExecutionResource warmCaches(Source source) {
logger.info("Cache wouldn't be applied to sources without Vocabulary and Result schemas, source [{}] was omitted", source.getSourceName());
return new JobExecutionResource();
}

String jobName = getWarmCacheJobName(source.getSourceKey());
String jobName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
Step resultsCacheStep = getCountStep(source, jobName);
Step achillesCacheStep = getAchillesStep(source, jobName);

SimpleJobBuilder builder = jobBuilders.get(jobName)
.start(achillesCacheStep)
.next(resultsCacheStep);
.start(achillesCacheStep);

/*
* Only run the results cache step if the results source has a
* priority >= 1
*/
if (getResultsDaimonPriority(source) > 0) {
builder = builder.next(resultsCacheStep);
}

return createJob(source.getSourceKey(), source.getSourceId(), jobName, builder);
}

Expand All @@ -429,40 +435,45 @@ private void warmCaches(Collection<Source> sources) {
for (Source source : vocabularySources) {
sourceIds.add(source.getSourceId());
sourceKeys.add(source.getSourceKey());
String jobStepName = getWarmCacheJobName(source.getSourceKey());
String jobStepName = getWarmCacheJobName(String.valueOf(source.getSourceId()), source.getSourceKey());
// Check whether cache job for current source already exists
if (jobService.findJobByName(jobStepName, jobStepName) == null) {
// Create the job step
Step jobStep = getJobStep(source, jobStepName);

// get priority of the results daimon
int priority = getPriority(source);
int priority = getResultsDaimonPriority(source);
// if source has results daimon with high priority - put it at the beginning of the queue
if (priority == 1) {
if (priority > 0) {
jobSteps.add(0, jobStep);
} else {
jobSteps.add(jobStep);
}
}

if (counter++ >= bucketSizes[bucketIndex] - 1) {
createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(", ")),
createJob(sourceIds.stream().map(String::valueOf).collect(Collectors.joining(",")),
String.join(", ", sourceKeys),
jobSteps);

bucketIndex++;
counter = 0;
sourceIds.clear();
sourceKeys.clear();
jobSteps.clear();
}
}
}

private Step getJobStep(Source source, String jobStepName) {
int resultDaimonPriority = getResultsDaimonPriority(source);
SimpleJob job = new SimpleJob(jobStepName);
job.setJobRepository(jobRepository);

job.addStep(getAchillesStep(source, jobStepName));
job.addStep(getCountStep(source, jobStepName));
if (resultDaimonPriority > 0) {
job.addStep(getCountStep(source, jobStepName));
}

return stepBuilderFactory.get(jobStepName)
.job(job)
Expand Down Expand Up @@ -538,7 +549,7 @@ private Step getCountStep(Source source, String jobStepName) {
.build();
}

private int getPriority(Source source) {
private int getResultsDaimonPriority(Source source) {
Optional<Integer> resultsPriority = source.getDaimons().stream()
.filter(d -> d.getDaimonType().equals(SourceDaimon.DaimonType.Results))
.map(SourceDaimon::getPriority)
Expand All @@ -547,15 +558,18 @@ private int getPriority(Source source) {
return resultsPriority.orElse(0);
}

private String getWarmCacheJobName(String sourceKey) {
return String.format("warming cache: %s", sourceKey);
}

private String getWarmCacheJobName(String sourceIds, String sourceKeys) {
// for multiple sources: try to compose a job name from source keys, and if it is too long - use source ids
String jobName = String.format("warming cache: %s", sourceKeys);

if (jobName.length() >= 100) { // job name in batch_job_instance is varchar(100)
jobName = String.format("warming cache: %s", sourceIds);

if (jobName.length() >= 100) { // if we still have more than 100 symbols
jobName = jobName.substring(0, 88);
jobName = jobName.substring(0, jobName.lastIndexOf(','))
.concat(" and more..."); // todo: this is quick fix. need better solution
}
}
return jobName;
}
Expand Down

0 comments on commit 8785ab7

Please sign in to comment.