Skip to content

Commit

Permalink
improve suggester throughput (#4539)
Browse files Browse the repository at this point in the history
- eliminate the locking done at Suggester level
- use single thread pool for init/rebuild
- add tunable for the search pool parallelism

fixes #4538
fixes #3347
  • Loading branch information
vladak authored Feb 15, 2024
1 parent 500e28c commit d0c4c47
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

/*
* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* Portions Copyright (c) 2019, Chris Fraire <[email protected]>.
*/
package org.opengrok.indexer.configuration;
Expand Down Expand Up @@ -51,6 +51,7 @@ public class SuggesterConfig {
public static final int BUILD_TERMINATION_TIME_DEFAULT = 1800; // half an hour should be enough
public static final int TIME_THRESHOLD_DEFAULT = 2000; // 2 sec
public static final int REBUILD_THREAD_POOL_PERCENT_NCPUS_DEFAULT = 80;
public static final int SEARCH_THREAD_POOL_PERCENT_NCPUS_DEFAULT = 90;

private static final Set<String> allowedProjectsDefault = null;
private static final Set<String> allowedFieldsDefault = Set.of(
Expand Down Expand Up @@ -141,6 +142,11 @@ public class SuggesterConfig {
*/
private int rebuildThreadPoolSizeInNcpuPercent;

/**
* Number of threads used for search pool expressed in percent of available CPUs in the system.
*/
private int searchThreadPoolSizeInNcpuPercent;

public SuggesterConfig() {
setEnabled(ENABLED_DEFAULT);
setMaxResults(MAX_RESULTS_DEFAULT);
Expand All @@ -157,6 +163,7 @@ public SuggesterConfig() {
setRebuildCronConfig(REBUILD_CRON_CONFIG_DEFAULT);
setBuildTerminationTime(BUILD_TERMINATION_TIME_DEFAULT);
setRebuildThreadPoolSizeInNcpuPercent(REBUILD_THREAD_POOL_PERCENT_NCPUS_DEFAULT);
setSearchThreadPoolSizeInNcpuPercent(SEARCH_THREAD_POOL_PERCENT_NCPUS_DEFAULT);
}

public boolean isEnabled() {
Expand Down Expand Up @@ -302,6 +309,17 @@ public int getRebuildThreadPoolSizeInNcpuPercent() {
return rebuildThreadPoolSizeInNcpuPercent;
}

public void setSearchThreadPoolSizeInNcpuPercent(final int percent) {
if (percent < 0 || percent > 100) {
throw new IllegalArgumentException("Need percentage value");
}
this.searchThreadPoolSizeInNcpuPercent = percent;
}

public int getSearchThreadPoolSizeInNcpuPercent() {
return searchThreadPoolSizeInNcpuPercent;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -355,6 +373,8 @@ static SuggesterConfig getForHelp() {
res.setRebuildCronConfig("1 0 * * *");
res.setBuildTerminationTime(1 + res.getBuildTerminationTime());
res.setRebuildThreadPoolSizeInNcpuPercent(1 + res.getRebuildThreadPoolSizeInNcpuPercent());
res.setSearchThreadPoolSizeInNcpuPercent(1 + res.getSearchThreadPoolSizeInNcpuPercent());

return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

/*
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
*/
package org.opengrok.web.api.v1.suggester.provider.service.impl;

Expand All @@ -30,6 +30,7 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.jetbrains.annotations.VisibleForTesting;
import org.opengrok.indexer.Metrics;
import org.opengrok.indexer.configuration.OpenGrokThreadFactory;
import org.opengrok.suggest.Suggester;
Expand Down Expand Up @@ -193,14 +194,14 @@ public void rebuild() {

/** {@inheritDoc} */
@Override
public void rebuild(final String project) {
Project p = env.getProjects().get(project);
if (p == null) {
public void rebuild(final String projectName) {
Project project = env.getProjects().get(projectName);
if (project == null) {
logger.log(Level.WARNING, "Cannot rebuild suggester because project for name {0} was not found",
project);
return;
}
if (!p.isIndexed()) {
if (!project.isIndexed()) {
logger.log(Level.WARNING, "Cannot rebuild project {0} because it is not indexed yet", project);
return;
}
Expand All @@ -210,7 +211,7 @@ public void rebuild(final String project) {
logger.log(Level.FINE, "Cannot rebuild {0} because suggester is not initialized", project);
return;
}
suggester.rebuild(Collections.singleton(getNamedIndexDir(p)));
suggester.rebuild(Collections.singleton(getNamedIndexDir(project)));
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -299,28 +300,32 @@ public List<Entry<BytesRef, Integer>> getPopularityData(
}
}

private int getParallelismLevel(int ncpuPercent, String prefix) {
int paralleismLevel = (int) (((float) ncpuPercent / 100) * Runtime.getRuntime().availableProcessors());
if (paralleismLevel == 0) {
paralleismLevel = 1;
}
logger.log(Level.FINER, "Suggester {0} parallelism level: {1}", new Object[]{prefix, paralleismLevel});

return paralleismLevel;
}

private void initSuggester() {
SuggesterConfig suggesterConfig = env.getSuggesterConfig();
if (!suggesterConfig.isEnabled()) {
logger.log(Level.INFO, "Suggester disabled");
return;
}

File suggesterDir = new File(env.getDataRootPath(), IndexDatabase.SUGGESTER_DIR);
int rebuildParalleismLevel = (int) (((float) suggesterConfig.getRebuildThreadPoolSizeInNcpuPercent() / 100)
* Runtime.getRuntime().availableProcessors());
if (rebuildParalleismLevel == 0) {
rebuildParalleismLevel = 1;
}
logger.log(Level.FINER, "Suggester rebuild parallelism level: {}", rebuildParalleismLevel);
suggester = new Suggester(suggesterDir,
suggester = new Suggester(new File(env.getDataRootPath(), IndexDatabase.SUGGESTER_DIR),
suggesterConfig.getMaxResults(),
Duration.ofSeconds(suggesterConfig.getBuildTerminationTime()),
suggesterConfig.isAllowMostPopular(),
env.isProjectsEnabled(),
suggesterConfig.getAllowedFields(),
suggesterConfig.getTimeThreshold(),
rebuildParalleismLevel,
getParallelismLevel(suggesterConfig.getRebuildThreadPoolSizeInNcpuPercent(), "rebuild"),
getParallelismLevel(suggesterConfig.getSearchThreadPoolSizeInNcpuPercent(), "search"),
Metrics.getRegistry(),
env.isPrintProgress());

Expand Down Expand Up @@ -405,10 +410,12 @@ private Duration getTimeToNextRebuild() {
return d.get();
}

@VisibleForTesting
public void waitForRebuild(long timeout, TimeUnit unit) throws InterruptedException {
suggester.waitForRebuild(timeout, unit);
}

@VisibleForTesting
public void waitForInit(long timeout, TimeUnit unit) throws InterruptedException {
suggester.waitForInit(timeout, unit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

/*
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* Portions Copyright (c) 2019, 2020, Chris Fraire <[email protected]>.
*/
package org.opengrok.web.api.v1.controller;
Expand All @@ -33,7 +33,6 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opengrok.suggest.Suggester;
import org.opengrok.indexer.configuration.RuntimeEnvironment;
import org.opengrok.indexer.configuration.SuggesterConfig;
import org.opengrok.indexer.index.Indexer;
Expand All @@ -43,13 +42,10 @@
import org.opengrok.web.api.v1.suggester.provider.service.impl.SuggesterServiceImpl;

import java.io.File;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

Expand Down Expand Up @@ -91,23 +87,12 @@ public static void tearDownClass() {
}

@BeforeEach
void before() {
await().atMost(15, TimeUnit.SECONDS).until(() -> getSuggesterProjectDataSize() == 1);
void before() throws Exception {
SuggesterServiceImpl.getInstance().waitForInit(15, TimeUnit.SECONDS);

env.setSuggesterConfig(new SuggesterConfig());
}

private static int getSuggesterProjectDataSize() throws Exception {
Field f = SuggesterServiceImpl.class.getDeclaredField("suggester");
f.setAccessible(true);
Suggester suggester = (Suggester) f.get(SuggesterServiceImpl.getInstance());

Field f2 = Suggester.class.getDeclaredField("projectData");
f2.setAccessible(true);

return ((Map) f2.get(suggester)).size();
}

@Test
void suggestionsSimpleTest() {
SuggesterControllerTest.Result res = target(SuggesterController.PATH)
Expand Down
Loading

0 comments on commit d0c4c47

Please sign in to comment.