From d0c4c473ae9b7f6f09ee396aebe911a0d87af0f4 Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Thu, 15 Feb 2024 10:09:40 +0100 Subject: [PATCH] improve suggester throughput (#4539) - eliminate the locking done at Suggester level - use single thread pool for init/rebuild - add tunable for the search pool parallelism fixes #4538 fixes #3347 --- .../configuration/SuggesterConfig.java | 22 +- .../service/impl/SuggesterServiceImpl.java | 37 ++- ...ggesterControllerProjectsDisabledTest.java | 21 +- .../java/org/opengrok/suggest/Suggester.java | 267 +++++++++++------- .../suggest/SuggesterProjectData.java | 15 +- .../org/opengrok/suggest/SuggesterTest.java | 45 ++- 6 files changed, 237 insertions(+), 170 deletions(-) diff --git a/opengrok-indexer/src/main/java/org/opengrok/indexer/configuration/SuggesterConfig.java b/opengrok-indexer/src/main/java/org/opengrok/indexer/configuration/SuggesterConfig.java index 907f0c18ac6..359305b3bc1 100644 --- a/opengrok-indexer/src/main/java/org/opengrok/indexer/configuration/SuggesterConfig.java +++ b/opengrok-indexer/src/main/java/org/opengrok/indexer/configuration/SuggesterConfig.java @@ -18,7 +18,7 @@ */ /* - * Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. * Portions Copyright (c) 2019, Chris Fraire . */ package org.opengrok.indexer.configuration; @@ -51,6 +51,7 @@ public class SuggesterConfig { public static final int BUILD_TERMINATION_TIME_DEFAULT = 1800; // half an hour should be enough public static final int TIME_THRESHOLD_DEFAULT = 2000; // 2 sec public static final int REBUILD_THREAD_POOL_PERCENT_NCPUS_DEFAULT = 80; + public static final int SEARCH_THREAD_POOL_PERCENT_NCPUS_DEFAULT = 90; private static final Set allowedProjectsDefault = null; private static final Set allowedFieldsDefault = Set.of( @@ -141,6 +142,11 @@ public class SuggesterConfig { */ private int rebuildThreadPoolSizeInNcpuPercent; + /** + * Number of threads used for search pool expressed in percent of available CPUs in the system. + */ + private int searchThreadPoolSizeInNcpuPercent; + public SuggesterConfig() { setEnabled(ENABLED_DEFAULT); setMaxResults(MAX_RESULTS_DEFAULT); @@ -157,6 +163,7 @@ public SuggesterConfig() { setRebuildCronConfig(REBUILD_CRON_CONFIG_DEFAULT); setBuildTerminationTime(BUILD_TERMINATION_TIME_DEFAULT); setRebuildThreadPoolSizeInNcpuPercent(REBUILD_THREAD_POOL_PERCENT_NCPUS_DEFAULT); + setSearchThreadPoolSizeInNcpuPercent(SEARCH_THREAD_POOL_PERCENT_NCPUS_DEFAULT); } public boolean isEnabled() { @@ -302,6 +309,17 @@ public int getRebuildThreadPoolSizeInNcpuPercent() { return rebuildThreadPoolSizeInNcpuPercent; } + public void setSearchThreadPoolSizeInNcpuPercent(final int percent) { + if (percent < 0 || percent > 100) { + throw new IllegalArgumentException("Need percentage value"); + } + this.searchThreadPoolSizeInNcpuPercent = percent; + } + + public int getSearchThreadPoolSizeInNcpuPercent() { + return searchThreadPoolSizeInNcpuPercent; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -355,6 +373,8 @@ static SuggesterConfig getForHelp() { res.setRebuildCronConfig("1 0 * * *"); res.setBuildTerminationTime(1 + res.getBuildTerminationTime()); res.setRebuildThreadPoolSizeInNcpuPercent(1 + res.getRebuildThreadPoolSizeInNcpuPercent()); + res.setSearchThreadPoolSizeInNcpuPercent(1 + res.getSearchThreadPoolSizeInNcpuPercent()); + return res; } diff --git a/opengrok-web/src/main/java/org/opengrok/web/api/v1/suggester/provider/service/impl/SuggesterServiceImpl.java b/opengrok-web/src/main/java/org/opengrok/web/api/v1/suggester/provider/service/impl/SuggesterServiceImpl.java index f7f3ab5310b..423d48e262d 100644 --- a/opengrok-web/src/main/java/org/opengrok/web/api/v1/suggester/provider/service/impl/SuggesterServiceImpl.java +++ b/opengrok-web/src/main/java/org/opengrok/web/api/v1/suggester/provider/service/impl/SuggesterServiceImpl.java @@ -18,7 +18,7 @@ */ /* - * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. */ package org.opengrok.web.api.v1.suggester.provider.service.impl; @@ -30,6 +30,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; +import org.jetbrains.annotations.VisibleForTesting; import org.opengrok.indexer.Metrics; import org.opengrok.indexer.configuration.OpenGrokThreadFactory; import org.opengrok.suggest.Suggester; @@ -193,14 +194,14 @@ public void rebuild() { /** {@inheritDoc} */ @Override - public void rebuild(final String project) { - Project p = env.getProjects().get(project); - if (p == null) { + public void rebuild(final String projectName) { + Project project = env.getProjects().get(projectName); + if (project == null) { logger.log(Level.WARNING, "Cannot rebuild suggester because project for name {0} was not found", project); return; } - if (!p.isIndexed()) { + if (!project.isIndexed()) { logger.log(Level.WARNING, "Cannot rebuild project {0} because it is not indexed yet", project); return; } @@ -210,7 +211,7 @@ public void rebuild(final String project) { logger.log(Level.FINE, "Cannot rebuild {0} because suggester is not initialized", project); return; } - suggester.rebuild(Collections.singleton(getNamedIndexDir(p))); + suggester.rebuild(Collections.singleton(getNamedIndexDir(project))); } finally { lock.readLock().unlock(); } @@ -299,6 +300,16 @@ public List> getPopularityData( } } + private int getParallelismLevel(int ncpuPercent, String prefix) { + int paralleismLevel = (int) (((float) ncpuPercent / 100) * Runtime.getRuntime().availableProcessors()); + if (paralleismLevel == 0) { + paralleismLevel = 1; + } + logger.log(Level.FINER, "Suggester {0} parallelism level: {1}", new Object[]{prefix, paralleismLevel}); + + return paralleismLevel; + } + private void initSuggester() { SuggesterConfig suggesterConfig = env.getSuggesterConfig(); if (!suggesterConfig.isEnabled()) { @@ -306,21 +317,15 @@ private void initSuggester() { return; } - File suggesterDir = new File(env.getDataRootPath(), IndexDatabase.SUGGESTER_DIR); - int rebuildParalleismLevel = (int) (((float) suggesterConfig.getRebuildThreadPoolSizeInNcpuPercent() / 100) - * Runtime.getRuntime().availableProcessors()); - if (rebuildParalleismLevel == 0) { - rebuildParalleismLevel = 1; - } - logger.log(Level.FINER, "Suggester rebuild parallelism level: {}", rebuildParalleismLevel); - suggester = new Suggester(suggesterDir, + suggester = new Suggester(new File(env.getDataRootPath(), IndexDatabase.SUGGESTER_DIR), suggesterConfig.getMaxResults(), Duration.ofSeconds(suggesterConfig.getBuildTerminationTime()), suggesterConfig.isAllowMostPopular(), env.isProjectsEnabled(), suggesterConfig.getAllowedFields(), suggesterConfig.getTimeThreshold(), - rebuildParalleismLevel, + getParallelismLevel(suggesterConfig.getRebuildThreadPoolSizeInNcpuPercent(), "rebuild"), + getParallelismLevel(suggesterConfig.getSearchThreadPoolSizeInNcpuPercent(), "search"), Metrics.getRegistry(), env.isPrintProgress()); @@ -405,10 +410,12 @@ private Duration getTimeToNextRebuild() { return d.get(); } + @VisibleForTesting public void waitForRebuild(long timeout, TimeUnit unit) throws InterruptedException { suggester.waitForRebuild(timeout, unit); } + @VisibleForTesting public void waitForInit(long timeout, TimeUnit unit) throws InterruptedException { suggester.waitForInit(timeout, unit); } diff --git a/opengrok-web/src/test/java/org/opengrok/web/api/v1/controller/SuggesterControllerProjectsDisabledTest.java b/opengrok-web/src/test/java/org/opengrok/web/api/v1/controller/SuggesterControllerProjectsDisabledTest.java index a70fcbebc51..6f8a5344229 100644 --- a/opengrok-web/src/test/java/org/opengrok/web/api/v1/controller/SuggesterControllerProjectsDisabledTest.java +++ b/opengrok-web/src/test/java/org/opengrok/web/api/v1/controller/SuggesterControllerProjectsDisabledTest.java @@ -18,7 +18,7 @@ */ /* - * Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. * Portions Copyright (c) 2019, 2020, Chris Fraire . */ package org.opengrok.web.api.v1.controller; @@ -33,7 +33,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opengrok.suggest.Suggester; import org.opengrok.indexer.configuration.RuntimeEnvironment; import org.opengrok.indexer.configuration.SuggesterConfig; import org.opengrok.indexer.index.Indexer; @@ -43,13 +42,10 @@ import org.opengrok.web.api.v1.suggester.provider.service.impl.SuggesterServiceImpl; import java.io.File; -import java.lang.reflect.Field; import java.util.Collections; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -91,23 +87,12 @@ public static void tearDownClass() { } @BeforeEach - void before() { - await().atMost(15, TimeUnit.SECONDS).until(() -> getSuggesterProjectDataSize() == 1); + void before() throws Exception { + SuggesterServiceImpl.getInstance().waitForInit(15, TimeUnit.SECONDS); env.setSuggesterConfig(new SuggesterConfig()); } - private static int getSuggesterProjectDataSize() throws Exception { - Field f = SuggesterServiceImpl.class.getDeclaredField("suggester"); - f.setAccessible(true); - Suggester suggester = (Suggester) f.get(SuggesterServiceImpl.getInstance()); - - Field f2 = Suggester.class.getDeclaredField("projectData"); - f2.setAccessible(true); - - return ((Map) f2.get(suggester)).size(); - } - @Test void suggestionsSimpleTest() { SuggesterControllerTest.Result res = target(SuggesterController.PATH) diff --git a/suggester/src/main/java/org/opengrok/suggest/Suggester.java b/suggester/src/main/java/org/opengrok/suggest/Suggester.java index ab2b062499e..6f7073e4160 100644 --- a/suggester/src/main/java/org/opengrok/suggest/Suggester.java +++ b/suggester/src/main/java/org/opengrok/suggest/Suggester.java @@ -18,7 +18,7 @@ */ /* - * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. */ package org.opengrok.suggest; @@ -32,6 +32,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; import org.opengrok.suggest.query.SuggesterPrefixQuery; import org.opengrok.suggest.query.SuggesterQuery; import org.opengrok.suggest.util.Progress; @@ -55,6 +57,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -76,9 +79,7 @@ public final class Suggester implements Closeable { private static final Logger LOGGER = Logger.getLogger(Suggester.class.getName()); - private final Map projectData = new ConcurrentHashMap<>(); - - private final Object lock = new Object(); + private final Map projectDataMap = new ConcurrentHashMap<>(); private final File suggesterDir; @@ -94,7 +95,6 @@ public final class Suggester implements Closeable { private final int timeThreshold; - private final int rebuildParallelismLevel; private final boolean isPrintProgress; private volatile boolean rebuilding; @@ -104,18 +104,12 @@ public final class Suggester implements Closeable { private final CountDownLatch initDone = new CountDownLatch(1); - private final Timer suggesterRebuildTimer; - private final Timer suggesterInitTimer; + private final Timer suggesterRebuildTimer; // per suggester data + private final Timer suggesterInitTimer; // per suggester data + + private final ExecutorService searchExecutorService; - // do NOT use fork join thread pool (work stealing thread pool) because it does not send interrupts upon cancellation - private final ExecutorService executorService = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), - runnable -> { - Thread thread = Executors.defaultThreadFactory().newThread(runnable); - // This should match the naming in OpenGrokThreadFactory class. - thread.setName("OpenGrok-suggester-lookup-" + thread.getId()); - return thread; - }); + private final ExecutorService initRebuildExecutor; /** * @param suggesterDir directory under which the suggester data should be created @@ -127,6 +121,7 @@ public final class Suggester implements Closeable { * if {@code null} then enabled for all fields * @param timeThreshold time in milliseconds after which the suggestions requests should time out * @param rebuildParallelismLevel parallelism level for rebuild + * @param searchParallelismLevel parallelism level for search * @param registry meter registry * @param isPrintProgress whether to report progress for initialization and rebuild */ @@ -140,6 +135,7 @@ public Suggester( final Set allowedFields, final int timeThreshold, final int rebuildParallelismLevel, + final int searchParallelismLevel, MeterRegistry registry, boolean isPrintProgress) { if (suggesterDir == null) { @@ -158,9 +154,25 @@ public Suggester( this.projectsEnabled = projectsEnabled; this.allowedFields = new HashSet<>(allowedFields); this.timeThreshold = timeThreshold; - this.rebuildParallelismLevel = rebuildParallelismLevel; this.isPrintProgress = isPrintProgress; + // do NOT use fork join thread pool (work stealing thread pool) because it does not send interrupts upon cancellation + this.searchExecutorService = Executors.newFixedThreadPool(searchParallelismLevel, + runnable -> { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + // This should match the naming in OpenGrokThreadFactory class. + thread.setName("OpenGrok-suggester-lookup-" + thread.getId()); + return thread; + }); + + this.initRebuildExecutor = Executors.newFixedThreadPool(rebuildParallelismLevel, + runnable -> { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + // This should match the naming in OpenGrokThreadFactory class. + thread.setName("OpenGrok-suggester-rebuild-" + thread.getId()); + return thread; + }); + suggesterRebuildTimer = Timer.builder("suggester.rebuild.latency"). description("suggester rebuild latency"). register(registry); @@ -173,7 +185,7 @@ public Suggester( * Initializes suggester data for specified indexes. The data is initialized asynchronously. * @param luceneIndexes paths to Lucene indexes and name with which the index should be associated */ - public void init(final Collection luceneIndexes) { + public synchronized void init(final Collection luceneIndexes) { if (luceneIndexes == null || luceneIndexes.isEmpty()) { LOGGER.log(Level.INFO, "No index directories found, exiting..."); return; @@ -182,27 +194,63 @@ public void init(final Collection luceneIndexes) { throw new IllegalArgumentException("Projects are not enabled and multiple Lucene indexes were passed"); } - synchronized (lock) { - Instant start = Instant.now(); - LOGGER.log(Level.INFO, "Initializing suggester"); + Instant start = Instant.now(); + LOGGER.log(Level.INFO, "Initializing suggester for {0}", luceneIndexes); + try (Progress progress = new Progress(LOGGER, "suggester initialization", luceneIndexes.size(), + Level.INFO, isPrintProgress)) { - ExecutorService executor = Executors.newWorkStealingPool(rebuildParallelismLevel); + List> futures = new ArrayList<>(); + for (NamedIndexDir indexDir : luceneIndexes) { + if (terminating) { + LOGGER.log(Level.INFO, "Terminating suggester initialization"); + return; + } - try (Progress progress = new Progress(LOGGER, "suggester initialization", luceneIndexes.size(), - Level.INFO, isPrintProgress)) { - for (NamedIndexDir indexDir : luceneIndexes) { - if (terminating) { - LOGGER.log(Level.INFO, "Terminating suggester initialization"); - return; - } - submitInitIfIndexExists(executor, indexDir, progress); + SuggesterProjectData projectData = this.projectDataMap.computeIfAbsent(getProjectDataKey(indexDir), + dir -> createProjectData(indexDir)); + if (projectData == null) { + LOGGER.log(Level.WARNING, "failed to create project data for {0}", indexDir); + continue; } - shutdownAndAwaitTermination(executor, start, suggesterInitTimer, - "Suggester successfully initialized"); - initDone.countDown(); + submitInitIfIndexExists(initRebuildExecutor, indexDir, projectData, progress).ifPresent(futures::add); } + + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.log(Level.WARNING, "cannot retrieve suggester init result", e); + } + } + + initDone.countDown(); + Duration duration = Duration.between(start, Instant.now()); + LOGGER.log(Level.INFO, "{0} (took {1})", new Object[]{"Suggesters for " + luceneIndexes + " were initialized", + DurationFormatUtils.formatDurationWords(duration.toMillis(), + true, true)}); + } + } + + private String getProjectDataKey(NamedIndexDir indexDir) { + if (projectsEnabled) { + return indexDir.name; + } else { + return PROJECTS_DISABLED_KEY; + } + } + + @Nullable + private SuggesterProjectData createProjectData(NamedIndexDir indexDir) { + SuggesterProjectData projectData = null; + try { + projectData = new SuggesterProjectData(FSDirectory.open(indexDir.path), + getSuggesterDir(indexDir.name), allowMostPopular, allowedFields); + } catch (IOException e) { + LOGGER.log(Level.WARNING, String.format("creating suggester data for %s failed", indexDir), e); + return null; } + return projectData; } /** @@ -211,26 +259,38 @@ public void init(final Collection luceneIndexes) { * @param unit timeout unit * @throws InterruptedException on canceled await() */ + @VisibleForTesting public void waitForInit(long timeout, TimeUnit unit) throws InterruptedException { if (!initDone.await(timeout, unit)) { LOGGER.log(Level.WARNING, "Initialization did not finish in {0} {1}", new Object[] {timeout, unit}); } } - private void submitInitIfIndexExists(final ExecutorService executorService, final NamedIndexDir indexDir, - Progress progress) { + private Optional> submitInitIfIndexExists(final ExecutorService executorService, + final NamedIndexDir indexDir, final SuggesterProjectData data, + Progress progress) { try { if (indexExists(indexDir.path)) { - executorService.submit(getInitRunnable(indexDir, progress)); + return Optional.of(executorService.submit(getInitRunnable(data, progress))); } else { - LOGGER.log(Level.FINE, "Index in ''{0}'' directory does not exist, skipping...", indexDir); + LOGGER.log(Level.FINE, "Index in ''{0}'' directory does not exist, skipping...", data); } } catch (IOException e) { - LOGGER.log(Level.WARNING, String.format("Could not check if index in '%s' exists", indexDir), e); + LOGGER.log(Level.WARNING, String.format("Could not check if index in '%s' exists", data), e); + } + return Optional.empty(); + } + + @Nullable + private SuggesterProjectData getProjectData(final String name) { + if (projectsEnabled) { + return projectDataMap.get(name); + } else { + return projectDataMap.get(PROJECTS_DISABLED_KEY); } } - private Runnable getInitRunnable(final NamedIndexDir indexDir, Progress progress) { + private Runnable getInitRunnable(final SuggesterProjectData data, Progress progress) { return () -> { try { if (terminating) { @@ -238,23 +298,15 @@ private Runnable getInitRunnable(final NamedIndexDir indexDir, Progress progress } Instant start = Instant.now(); - LOGGER.log(Level.FINE, "Initializing suggester data in ''{0}''", indexDir); - - SuggesterProjectData wfst = new SuggesterProjectData(FSDirectory.open(indexDir.path), - getSuggesterDir(indexDir.name), allowMostPopular, allowedFields); - wfst.init(); - if (projectsEnabled) { - projectData.put(indexDir.name, wfst); - } else { - projectData.put(PROJECTS_DISABLED_KEY, wfst); - } - - Duration d = Duration.between(start, Instant.now()); + LOGGER.log(Level.FINE, "Initializing suggester data in ''{0}''", data); + data.init(); + Duration duration = Duration.between(start, Instant.now()); + suggesterInitTimer.record(duration); LOGGER.log(Level.FINE, "Finished initialization of suggester data in ''{0}'', took {1}", - new Object[] {indexDir, d}); + new Object[] {data, duration}); progress.increment(); } catch (Exception e) { - LOGGER.log(Level.SEVERE, String.format("Could not initialize suggester data for '%s'", indexDir), e); + LOGGER.log(Level.SEVERE, String.format("Could not initialize suggester data for '%s'", data), e); } }; } @@ -273,17 +325,13 @@ private boolean indexExists(final Path indexDir) throws IOException { } } - private void shutdownAndAwaitTermination(final ExecutorService executorService, Instant start, - Timer timer, - final String logMessageOnSuccess) { + private void shutdownAndAwaitTermination(final ExecutorService executorService) { executorService.shutdown(); try { - executorService.awaitTermination(awaitTerminationTime.toMillis(), TimeUnit.MILLISECONDS); - Duration duration = Duration.between(start, Instant.now()); - timer.record(duration); - LOGGER.log(Level.INFO, "{0} (took {1})", new Object[]{logMessageOnSuccess, - DurationFormatUtils.formatDurationWords(duration.toMillis(), - true, true)}); + if (!executorService.awaitTermination(awaitTerminationTime.toMillis(), TimeUnit.MILLISECONDS)) { + LOGGER.log(Level.WARNING, "executor was shutdown with timeout", executorService); + } + executorService.shutdownNow(); } catch (InterruptedException e) { LOGGER.log(Level.SEVERE, "Interrupted while building suggesters", e); Thread.currentThread().interrupt(); @@ -304,30 +352,40 @@ public void rebuild(final Collection indexDirs) { rebuilding = true; rebuildLock.unlock(); - synchronized (lock) { - if (terminating) { - return; - } - - Instant start = Instant.now(); - LOGGER.log(Level.INFO, "Rebuilding the following suggesters: {0}", indexDirs); + if (terminating) { + return; + } - ExecutorService executor = Executors.newWorkStealingPool(rebuildParallelismLevel); + Instant start = Instant.now(); + LOGGER.log(Level.INFO, "Rebuilding the following suggesters: {0}", indexDirs); - try (Progress progress = new Progress(LOGGER, "suggester rebuild", indexDirs.size(), - Level.INFO, isPrintProgress)) { - for (NamedIndexDir indexDir : indexDirs) { - SuggesterProjectData data = this.projectData.get(indexDir.name); - if (data != null) { - executor.submit(getRebuildRunnable(data, progress)); + try (Progress progress = new Progress(LOGGER, "suggester rebuild", indexDirs.size(), + Level.INFO, isPrintProgress)) { + List> futures = new ArrayList<>(); + for (NamedIndexDir indexDir : indexDirs) { + SuggesterProjectData projectData = this.projectDataMap.computeIfAbsent(getProjectDataKey(indexDir), + dir -> createProjectData(indexDir)); + if (projectData != null) { + if (projectData.isInitialized()) { + futures.add(initRebuildExecutor.submit(getRebuildRunnable(projectData, progress))); } else { - submitInitIfIndexExists(executor, indexDir, progress); + submitInitIfIndexExists(initRebuildExecutor, indexDir, projectData, progress).ifPresent(futures::add); } } + } - shutdownAndAwaitTermination(executor, start, suggesterRebuildTimer, - "Suggesters for " + indexDirs + " were successfully rebuilt"); + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOGGER.log(Level.WARNING, "cannot retrieve suggester rebuild result", e); + } } + + Duration duration = Duration.between(start, Instant.now()); + LOGGER.log(Level.INFO, "{0} (took {1})", new Object[]{"Suggesters for " + indexDirs + " were rebuilt", + DurationFormatUtils.formatDurationWords(duration.toMillis(), + true, true)}); } rebuildLock.lock(); @@ -368,9 +426,9 @@ private Runnable getRebuildRunnable(final SuggesterProjectData data, Progress pr Instant start = Instant.now(); LOGGER.log(Level.FINE, "Rebuilding {0}", data); data.rebuild(); - - Duration d = Duration.between(start, Instant.now()); - LOGGER.log(Level.FINE, "Rebuild of {0} finished, took {1}", new Object[] {data, d}); + Duration duration = Duration.between(start, Instant.now()); + suggesterRebuildTimer.record(duration); + LOGGER.log(Level.FINE, "Rebuild of {0} finished, took {1}", new Object[] {data, duration}); progress.increment(); } catch (Exception e) { LOGGER.log(Level.SEVERE, "Could not rebuild suggester", e); @@ -387,18 +445,15 @@ public void remove(final Iterable names) { return; } - synchronized (lock) { - LOGGER.log(Level.INFO, "Removing following suggesters: {0}", names); - - for (String suggesterName : names) { - SuggesterProjectData collection = projectData.get(suggesterName); - if (collection == null) { - LOGGER.log(Level.WARNING, "Unknown suggester {0}", suggesterName); - continue; - } - collection.remove(); - projectData.remove(suggesterName); + LOGGER.log(Level.INFO, "Removing following suggesters: {0}", names); + for (String suggesterName : names) { + SuggesterProjectData collection = projectDataMap.get(suggesterName); + if (collection == null) { + LOGGER.log(Level.WARNING, "Unknown suggester {0}", suggesterName); + continue; } + projectDataMap.remove(suggesterName); + collection.remove(); } } @@ -442,7 +497,7 @@ private Suggestions prefixLookup( BooleanWrapper partialResult = new BooleanWrapper(); List results = readers.parallelStream().flatMap(namedIndexReader -> { - SuggesterProjectData data = projectData.get(namedIndexReader.name); + SuggesterProjectData data = projectDataMap.get(namedIndexReader.name); if (data == null) { LOGGER.log(Level.FINE, "{0} not yet initialized", namedIndexReader.name); partialResult.value = true; @@ -481,7 +536,7 @@ private Suggestions complexLookup( List> futures; try { - futures = executorService.invokeAll(searchTasks, timeThreshold, TimeUnit.MILLISECONDS); + futures = searchExecutorService.invokeAll(searchTasks, timeThreshold, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOGGER.log(Level.WARNING, "Interrupted while invoking suggester search", e); Thread.currentThread().interrupt(); @@ -537,7 +592,7 @@ public void onSearch(final Iterable projects, final Query query) { } private void incrementSearchCount(List terms, final String projectDataKey) { - Optional.ofNullable(projectData.get(projectDataKey)) + Optional.ofNullable(projectDataMap.get(projectDataKey)) .ifPresent(data -> terms.forEach(data::incrementSearchCount)); } @@ -576,20 +631,15 @@ public boolean increaseSearchCount(final String project, final Term term, final if (!allowMostPopular) { return false; } - SuggesterProjectData data; - if (!projectsEnabled) { - data = projectData.get(PROJECTS_DISABLED_KEY); - } else { - data = projectData.get(project); - } - if (data == null) { + SuggesterProjectData projectData = getProjectData(project); + if (projectData == null) { LOGGER.log(Level.WARNING, "Cannot update search count because of missing suggester data{}", projectsEnabled ? " for project " + project : ""); return false; } - return data.incrementSearchCount(term, value, waitForLock); + return projectData.incrementSearchCount(term, value, waitForLock); } /** @@ -606,7 +656,7 @@ public List> getSearchCounts( final int page, final int pageSize ) { - SuggesterProjectData data = projectData.get(project); + SuggesterProjectData data = projectDataMap.get(project); if (data == null) { LOGGER.log(Level.FINE, "Cannot retrieve search counts because suggester data for project {0} was not found", project); @@ -628,8 +678,9 @@ public void terminate() { */ @Override public void close() { - executorService.shutdownNow(); - projectData.values().forEach(f -> { + searchExecutorService.shutdownNow(); + shutdownAndAwaitTermination(initRebuildExecutor); + projectDataMap.values().forEach(f -> { try { f.close(); } catch (IOException e) { @@ -665,7 +716,7 @@ public Void call() { try { started = true; - SuggesterProjectData data = projectData.get(namedIndexReader.name); + SuggesterProjectData data = projectDataMap.get(namedIndexReader.name); if (data == null) { LOGGER.log(Level.FINE, "{0} not yet initialized", namedIndexReader.name); return null; diff --git a/suggester/src/main/java/org/opengrok/suggest/SuggesterProjectData.java b/suggester/src/main/java/org/opengrok/suggest/SuggesterProjectData.java index 42f15287b54..4fda2b8786e 100644 --- a/suggester/src/main/java/org/opengrok/suggest/SuggesterProjectData.java +++ b/suggester/src/main/java/org/opengrok/suggest/SuggesterProjectData.java @@ -18,7 +18,7 @@ */ /* - * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. */ package org.opengrok.suggest; @@ -105,6 +105,8 @@ class SuggesterProjectData implements Closeable { private final Directory tempDir; + private boolean initialized; // Whether init() was called. + SuggesterProjectData( final Directory indexDir, final Path suggesterDir, @@ -163,6 +165,8 @@ public void init() throws IOException { } storeDataVersion(commitVersion); + + initialized = true; } finally { lock.writeLock().unlock(); } @@ -249,6 +253,15 @@ public void rebuild() throws IOException { } } + public boolean isInitialized() { + lock.writeLock().lock(); + try { + return initialized; + } finally { + lock.writeLock().unlock(); + } + } + private void build() throws IOException { try (IndexReader indexReader = DirectoryReader.open(indexDir)) { for (String field : fields) { diff --git a/suggester/src/test/java/org/opengrok/suggest/SuggesterTest.java b/suggester/src/test/java/org/opengrok/suggest/SuggesterTest.java index f35285a4cd2..16199fbacd1 100644 --- a/suggester/src/test/java/org/opengrok/suggest/SuggesterTest.java +++ b/suggester/src/test/java/org/opengrok/suggest/SuggesterTest.java @@ -18,7 +18,7 @@ */ /* - * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. */ package org.opengrok.suggest; @@ -51,13 +51,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; @@ -108,7 +106,7 @@ void testNullSuggesterDir() { var terminationDuration = Duration.ofMinutes(5); assertThrows(IllegalArgumentException.class, () -> new Suggester(null, 10, terminationDuration, false, - true, null, Integer.MAX_VALUE, 1, registry, + true, null, Integer.MAX_VALUE, 1, 1, registry, false)); } @@ -129,14 +127,14 @@ private void createSuggester(Long duration) throws IOException { .orElse(null); try { new Suggester(tempFile.toFile(), 10, objDuration, false, - true, null, Integer.MAX_VALUE, 1, registry, + true, null, Integer.MAX_VALUE, 1, 1, registry, false); } finally { tempFile.toFile().delete(); } } - private SuggesterTestData initSuggester() throws IOException { + private SuggesterTestData initSuggester() throws Exception { Path tempIndexDir = Files.createTempDirectory("opengrok"); Directory dir = FSDirectory.open(tempIndexDir); @@ -148,11 +146,11 @@ private SuggesterTestData initSuggester() throws IOException { Suggester s = new Suggester(tempSuggesterDir.toFile(), 10, Duration.ofMinutes(1), true, true, Collections.singleton("test"), Integer.MAX_VALUE, - Runtime.getRuntime().availableProcessors(), registry, false); + Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), + registry, false); s.init(Collections.singleton(new Suggester.NamedIndexDir("test", tempIndexDir))); - - await().atMost(2, TimeUnit.SECONDS).until(() -> getSuggesterProjectDataSize(s) == 1); + s.waitForInit(2, TimeUnit.SECONDS); SuggesterTestData testData = new SuggesterTestData(); testData.s = s; @@ -171,15 +169,8 @@ private void addText(final Directory dir, final String text) throws IOException } } - private static int getSuggesterProjectDataSize(final Suggester suggester) throws Exception { - java.lang.reflect.Field f2 = Suggester.class.getDeclaredField("projectData"); - f2.setAccessible(true); - - return ((Map) f2.get(suggester)).size(); - } - @Test - void testSimpleSuggestions() throws IOException { + void testSimpleSuggestions() throws Exception { SuggesterTestData t = initSuggester(); Suggester.NamedIndexReader ir = t.getNamedIndexReader(); @@ -194,7 +185,7 @@ void testSimpleSuggestions() throws IOException { } @Test - void testRefresh() throws IOException { + void testRefresh() throws Exception { SuggesterTestData t = initSuggester(); addText(t.getIndexDirectory(), "a1 a2"); @@ -213,7 +204,7 @@ void testRefresh() throws IOException { } @Test - void testIndexChangedWhileOffline() throws IOException { + void testIndexChangedWhileOffline() throws Exception { SuggesterTestData t = initSuggester(); t.s.close(); @@ -222,11 +213,11 @@ void testIndexChangedWhileOffline() throws IOException { t.s = new Suggester(t.suggesterDir.toFile(), 10, Duration.ofMinutes(1), false, true, Collections.singleton("test"), Integer.MAX_VALUE, - Runtime.getRuntime().availableProcessors(), registry, false); + Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), + registry, false); t.s.init(Collections.singleton(t.getNamedIndexDir())); - - await().atMost(2, TimeUnit.SECONDS).until(() -> getSuggesterProjectDataSize(t.s) == 1); + t.s.waitForInit(2, TimeUnit.SECONDS); Suggester.NamedIndexReader ir = t.getNamedIndexReader(); @@ -240,7 +231,7 @@ void testIndexChangedWhileOffline() throws IOException { } @Test - void testRemove() throws IOException { + void testRemove() throws Exception { SuggesterTestData t = initSuggester(); t.s.remove(Collections.singleton("test")); @@ -252,7 +243,7 @@ void testRemove() throws IOException { } @Test - void testComplexQuerySearch() throws IOException { + void testComplexQuerySearch() throws Exception { SuggesterTestData t = initSuggester(); List res = t.s.search(Collections.singletonList(t.getNamedIndexReader()), @@ -266,7 +257,7 @@ void testComplexQuerySearch() throws IOException { @Test @SuppressWarnings("unchecked") // for contains() - void testOnSearch() throws IOException { + void testOnSearch() throws Exception { SuggesterTestData t = initSuggester(); Query q = new BooleanQuery.Builder() @@ -285,7 +276,7 @@ void testOnSearch() throws IOException { } @Test - void testGetSearchCountsForUnknown() throws IOException { + void testGetSearchCountsForUnknown() throws Exception { SuggesterTestData t = initSuggester(); assertTrue(t.s.getSearchCounts("unknown", "unknown", 0, 10).isEmpty()); @@ -295,7 +286,7 @@ void testGetSearchCountsForUnknown() throws IOException { @Test @SuppressWarnings("unchecked") // for contains() - void testIncreaseSearchCount() throws IOException { + void testIncreaseSearchCount() throws Exception { SuggesterTestData t = initSuggester(); t.s.increaseSearchCount("test", new Term("test", "term2"), 100, true);