Skip to content

Commit

Permalink
Add allowlist setting for search-pipeline-common processors
Browse files Browse the repository at this point in the history
Add a new static setting that lets an operator choose specific search
pipeline processors to enable by name. The behavior is as follows:

- If the allowlist setting is not defined, all installed processors are
  enabled. This is the status quo.
- If the allowlist setting is defined as the empty set, then all processors
  are disabled.
- If the allowlist setting contains the names of valid processors, only those
  processors are enabled.
- If the allowlist setting contains a name of a processor that does not exist,
  then the server will fail to start with an IllegalStateException
  listing which processors were defined in the allowlist but are not
  installed.
- If the allowlist setting is changed between server restarts then any
  ingest pipeline using a now-disabled processor will fail. This is the
  same experience if a pipeline used a processor defined by a plugin but
  then that plugin were to be uninstalled across restarts.

A distinct setting exists for each of request, response, and search phase
results processors.

Related to #14439

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Jun 26, 2024
1 parent a99b494 commit bc8ecc7
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settin
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toSet());
.collect(Collectors.toUnmodifiableSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,61 @@

package org.opensearch.search.pipeline.common;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Plugin providing common search request/response processors for use in search pipelines.
*/
public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPipelinePlugin {

static final Setting<List<String>> REQUEST_PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"search.pipeline.common.request.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);

static final Setting<List<String>> RESPONSE_PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"search.pipeline.common.response.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);

static final Setting<List<String>> SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"search.pipeline.common.search.phase.results.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);

/**
* No constructor needed, but build complains if we don't have a constructor with JavaDoc.
*/
public SearchPipelineCommonModulePlugin() {}

@Override
public List<Setting<?>> getSettings() {
return List.of(

Check warning on line 59 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java#L59

Added line #L59 was not covered by tests
REQUEST_PROCESSORS_ALLOWLIST_SETTING,
RESPONSE_PROCESSORS_ALLOWLIST_SETTING,
SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING
);
}

/**
* Returns a map of processor factories.
*
Expand All @@ -34,25 +71,62 @@ public SearchPipelineCommonModulePlugin() {}
*/
@Override
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
ScriptRequestProcessor.TYPE,
new ScriptRequestProcessor.Factory(parameters.scriptService),
OversampleRequestProcessor.TYPE,
new OversampleRequestProcessor.Factory()
return filterForAllowlistSetting(
REQUEST_PROCESSORS_ALLOWLIST_SETTING,
parameters.env.settings(),
Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
ScriptRequestProcessor.TYPE,
new ScriptRequestProcessor.Factory(parameters.scriptService),
OversampleRequestProcessor.TYPE,
new OversampleRequestProcessor.Factory()
)
);
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Map.of(
RenameFieldResponseProcessor.TYPE,
new RenameFieldResponseProcessor.Factory(),
TruncateHitsResponseProcessor.TYPE,
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory()
return filterForAllowlistSetting(
RESPONSE_PROCESSORS_ALLOWLIST_SETTING,
parameters.env.settings(),
Map.of(
RenameFieldResponseProcessor.TYPE,
new RenameFieldResponseProcessor.Factory(),
TruncateHitsResponseProcessor.TYPE,
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory()
)
);
}

@Override
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Parameters parameters) {
return filterForAllowlistSetting(SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING, parameters.env.settings(), Map.of());
}

private <T extends Processor> Map<String, Processor.Factory<T>> filterForAllowlistSetting(
Setting<List<String>> allowlistSetting,
Settings settings,
Map<String, Processor.Factory<T>> map
) {
if (allowlistSetting.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(allowlistSetting.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toUnmodifiableSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) " + unknownAllowlistProcessors + " were defined in [" + allowlistSetting.getKey() + "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;

public class SearchPipelineCommonModulePluginTests extends OpenSearchTestCase {

public void testRequestProcessorAllowlist() throws IOException {
final String key = SearchPipelineCommonModulePlugin.REQUEST_PROCESSORS_ALLOWLIST_SETTING.getKey();
runAllowlistTest(key, List.of(), SearchPipelineCommonModulePlugin::getRequestProcessors);
runAllowlistTest(key, List.of("filter_query"), SearchPipelineCommonModulePlugin::getRequestProcessors);
runAllowlistTest(key, List.of("script"), SearchPipelineCommonModulePlugin::getRequestProcessors);
runAllowlistTest(key, List.of("oversample", "script"), SearchPipelineCommonModulePlugin::getRequestProcessors);
runAllowlistTest(key, List.of("filter_query", "script", "oversample"), SearchPipelineCommonModulePlugin::getRequestProcessors);

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> runAllowlistTest(key, List.of("foo"), SearchPipelineCommonModulePlugin::getRequestProcessors)
);
assertTrue(e.getMessage(), e.getMessage().contains("foo"));
}

public void testResponseProcessorAllowlist() throws IOException {
final String key = SearchPipelineCommonModulePlugin.RESPONSE_PROCESSORS_ALLOWLIST_SETTING.getKey();
runAllowlistTest(key, List.of(), SearchPipelineCommonModulePlugin::getResponseProcessors);
runAllowlistTest(key, List.of("rename_field"), SearchPipelineCommonModulePlugin::getResponseProcessors);
runAllowlistTest(key, List.of("truncate_hits"), SearchPipelineCommonModulePlugin::getResponseProcessors);
runAllowlistTest(key, List.of("collapse", "truncate_hits"), SearchPipelineCommonModulePlugin::getResponseProcessors);
runAllowlistTest(
key,
List.of("rename_field", "truncate_hits", "collapse"),
SearchPipelineCommonModulePlugin::getResponseProcessors
);

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> runAllowlistTest(key, List.of("foo"), SearchPipelineCommonModulePlugin::getResponseProcessors)
);
assertTrue(e.getMessage(), e.getMessage().contains("foo"));
}

public void testSearchPhaseResultsProcessorAllowlist() throws IOException {
final String key = SearchPipelineCommonModulePlugin.SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING.getKey();
runAllowlistTest(key, List.of(), SearchPipelineCommonModulePlugin::getSearchPhaseResultsProcessors);

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> runAllowlistTest(key, List.of("foo"), SearchPipelineCommonModulePlugin::getSearchPhaseResultsProcessors)
);
assertTrue(e.getMessage(), e.getMessage().contains("foo"));
}

private void runAllowlistTest(
String settingKey,
List<String> allowlist,
BiFunction<SearchPipelineCommonModulePlugin, SearchPipelinePlugin.Parameters, Map<String, ?>> function
) throws IOException {
final Settings settings = Settings.builder().putList(settingKey, allowlist).build();
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.copyOf(allowlist), function.apply(plugin, createParameters(settings)).keySet());
}
}

public void testAllowlistNotSpecified() throws IOException {
final Settings settings = Settings.EMPTY;
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
assertEquals(
Set.of("rename_field", "truncate_hits", "collapse"),
plugin.getResponseProcessors(createParameters(settings)).keySet()
);
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
}
}

private static SearchPipelinePlugin.Parameters createParameters(Settings settings) {
return new SearchPipelinePlugin.Parameters(
TestEnvironment.newEnvironment(Settings.builder().put(settings).put("path.home", "").build()),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}

0 comments on commit bc8ecc7

Please sign in to comment.