Skip to content

Commit

Permalink
Merge branch 'main' into bug-fix-synonym
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Froh <[email protected]>
  • Loading branch information
msfroh authored Nov 11, 2024
2 parents 8f09919 + c9edb48 commit 0c33516
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add a flag in QueryShardContext to differentiate inner hit query ([#16600](https://github.com/opensearch-project/OpenSearch/pull/16600))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,187 @@

package org.opensearch.search.pipeline.common;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.ingest.PipelineConfiguration;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class SearchPipelineCommonIT extends OpenSearchIntegTestCase {

private static final String TEST_INDEX = "myindex";
private static final String PIPELINE_NAME = "test_pipeline";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(SearchPipelineCommonModulePlugin.class);
}

@Before
public void setup() throws Exception {
createIndex(TEST_INDEX);

IndexRequest doc1 = new IndexRequest(TEST_INDEX).id("doc1").source(Map.of("field", "value"));
IndexRequest doc2 = new IndexRequest(TEST_INDEX).id("doc2").source(Map.of("field", "something else"));

IndexResponse ir = client().index(doc1).actionGet();
assertSame(RestStatus.CREATED, ir.status());
ir = client().index(doc2).actionGet();
assertSame(RestStatus.CREATED, ir.status());

RefreshResponse refRsp = client().admin().indices().refresh(new RefreshRequest(TEST_INDEX)).actionGet();
assertSame(RestStatus.OK, refRsp.getStatus());
}

@After
public void cleanup() throws Exception {
internalCluster().wipeIndices(TEST_INDEX);
}

public void testFilterQuery() {
// Create a pipeline with a filter_query processor.
String pipelineName = "foo";
createPipeline();

// Search without the pipeline. Should see both documents.
SearchRequest req = new SearchRequest(TEST_INDEX).source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()));
SearchResponse rsp = client().search(req).actionGet();
assertEquals(2, rsp.getHits().getTotalHits().value);

// Search with the pipeline. Should only see document with "field":"value".
req.pipeline(PIPELINE_NAME);
rsp = client().search(req).actionGet();
assertEquals(1, rsp.getHits().getTotalHits().value);

// Clean up.
deletePipeline();
}

public void testSearchWithTemporaryPipeline() throws Exception {

// Search without the pipeline. Should see both documents.
SearchRequest req = new SearchRequest(TEST_INDEX).source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()));
SearchResponse rsp = client().search(req).actionGet();
assertEquals(2, rsp.getHits().getTotalHits().value);

// Search with temporary pipeline
Map<String, Object> pipelineSourceMap = new HashMap<>();
Map<String, Object> requestProcessorConfig = new HashMap<>();

Map<String, Object> filterQuery = new HashMap<>();
filterQuery.put("query", Map.of("term", Map.of("field", "value")));
requestProcessorConfig.put("filter_query", filterQuery);
pipelineSourceMap.put("request_processors", List.of(requestProcessorConfig));

req = new SearchRequest(TEST_INDEX).source(
new SearchSourceBuilder().query(new MatchAllQueryBuilder()).searchPipelineSource(pipelineSourceMap)
);

SearchResponse rspWithTempPipeline = client().search(req).actionGet();
assertEquals(1, rspWithTempPipeline.getHits().getTotalHits().value);
}

public void testSearchWithDefaultPipeline() throws Exception {
// Create pipeline
createPipeline();

// Search without the pipeline. Should see both documents.
SearchRequest req = new SearchRequest(TEST_INDEX).source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()));
SearchResponse rsp = client().search(req).actionGet();
assertEquals(2, rsp.getHits().getTotalHits().value);

// Set pipeline as default for the index
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(TEST_INDEX);
updateSettingsRequest.settings(Settings.builder().put("index.search.default_pipeline", PIPELINE_NAME));
AcknowledgedResponse updateSettingsResponse = client().admin().indices().updateSettings(updateSettingsRequest).actionGet();
assertTrue(updateSettingsResponse.isAcknowledged());

// Search with the default pipeline. Should only see document with "field":"value".
rsp = client().search(req).actionGet();
assertEquals(1, rsp.getHits().getTotalHits().value);

// Clean up: Remove default pipeline setting
updateSettingsRequest = new UpdateSettingsRequest(TEST_INDEX);
updateSettingsRequest.settings(Settings.builder().putNull("index.search.default_pipeline"));
updateSettingsResponse = client().admin().indices().updateSettings(updateSettingsRequest).actionGet();
assertTrue(updateSettingsResponse.isAcknowledged());

// Clean up.
deletePipeline();
}

public void testUpdateSearchPipeline() throws Exception {
// Create initial pipeline
createPipeline();

// Verify initial pipeline
SearchRequest req = new SearchRequest(TEST_INDEX).source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()));
req.pipeline(PIPELINE_NAME);
SearchResponse initialRsp = client().search(req).actionGet();
assertEquals(1, initialRsp.getHits().getTotalHits().value);

BytesReference pipelineConfig = new BytesArray(
"{"
+ "\"description\": \"Updated pipeline\","
+ "\"request_processors\": ["
+ "{"
+ "\"filter_query\" : {"
+ "\"query\": {"
+ "\"term\" : {"
+ "\"field\" : \"something else\""
+ "}"
+ "}"
+ "}"
+ "}"
+ "]"
+ "}"
);

PipelineConfiguration pipeline = new PipelineConfiguration(PIPELINE_NAME, pipelineConfig, MediaTypeRegistry.JSON);

// Update pipeline
PutSearchPipelineRequest updateRequest = new PutSearchPipelineRequest(pipeline.getId(), pipelineConfig, MediaTypeRegistry.JSON);
AcknowledgedResponse ackRsp = client().admin().cluster().putSearchPipeline(updateRequest).actionGet();
assertTrue(ackRsp.isAcknowledged());

// Verify pipeline description
GetSearchPipelineResponse getPipelineResponse = client().admin()
.cluster()
.getSearchPipeline(new GetSearchPipelineRequest(PIPELINE_NAME))
.actionGet();
assertEquals(PIPELINE_NAME, getPipelineResponse.pipelines().get(0).getId());
assertEquals(pipeline.getConfigAsMap(), getPipelineResponse.pipelines().get(0).getConfigAsMap());
// Clean up.
deletePipeline();
}

private void createPipeline() {
PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest(
pipelineName,
PIPELINE_NAME,
new BytesArray(
"{"
+ "\"request_processors\": ["
Expand All @@ -62,35 +208,13 @@ public void testFilterQuery() {
);
AcknowledgedResponse ackRsp = client().admin().cluster().putSearchPipeline(putSearchPipelineRequest).actionGet();
assertTrue(ackRsp.isAcknowledged());
}

// Index some documents.
String indexName = "myindex";
IndexRequest doc1 = new IndexRequest(indexName).id("doc1").source(Map.of("field", "value"));
IndexRequest doc2 = new IndexRequest(indexName).id("doc2").source(Map.of("field", "something else"));

IndexResponse ir = client().index(doc1).actionGet();
assertSame(RestStatus.CREATED, ir.status());
ir = client().index(doc2).actionGet();
assertSame(RestStatus.CREATED, ir.status());

// Refresh so the documents are visible to search.
RefreshResponse refRsp = client().admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
assertSame(RestStatus.OK, refRsp.getStatus());

// Search without the pipeline. Should see both documents.
SearchRequest req = new SearchRequest(indexName).source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()));
SearchResponse rsp = client().search(req).actionGet();
assertEquals(2, rsp.getHits().getTotalHits().value);

// Search with the pipeline. Should only see document with "field":"value".
req.pipeline(pipelineName);
rsp = client().search(req).actionGet();
assertEquals(1, rsp.getHits().getTotalHits().value);

// Clean up.
ackRsp = client().admin().cluster().deleteSearchPipeline(new DeleteSearchPipelineRequest(pipelineName)).actionGet();
assertTrue(ackRsp.isAcknowledged());
ackRsp = client().admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet();
private void deletePipeline() {
AcknowledgedResponse ackRsp = client().admin()
.cluster()
.deleteSearchPipeline(new DeleteSearchPipelineRequest(PIPELINE_NAME))
.actionGet();
assertTrue(ackRsp.isAcknowledged());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ protected void doBuild(SearchContext parentSearchContext, InnerHitsContext inner
try {
queryShardContext.setParentFilter(parentFilter);
queryShardContext.nestedScope().nextLevel(nestedObjectMapper);
queryShardContext.setInnerHitQuery(true);
try {
NestedInnerHitSubContext nestedInnerHits = new NestedInnerHitSubContext(
name,
Expand All @@ -427,6 +428,7 @@ protected void doBuild(SearchContext parentSearchContext, InnerHitsContext inner
}
} finally {
queryShardContext.setParentFilter(previousParentFilter);
queryShardContext.setInnerHitQuery(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class QueryShardContext extends QueryRewriteContext {
private BitSetProducer parentFilter;
private DerivedFieldResolver derivedFieldResolver;
private boolean keywordIndexOrDocValuesEnabled;
private boolean isInnerHitQuery;

public QueryShardContext(
int shardId,
Expand Down Expand Up @@ -727,4 +728,12 @@ public BitSetProducer getParentFilter() {
public void setParentFilter(BitSetProducer parentFilter) {
this.parentFilter = parentFilter;
}

public boolean isInnerHitQuery() {
return isInnerHitQuery;
}

public void setInnerHitQuery(boolean isInnerHitQuery) {
this.isInnerHitQuery = isInnerHitQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ public void testParentFilterFromInlineLeafInnerHitsNestedQuery() throws Exceptio
if (context.getParentFilter() == null) {
throw new Exception("Expect parent filter to be non-null");
}
if (context.isInnerHitQuery() == false) {
throw new Exception("Expect it to be inner hit query");
}
return invoke.callRealMethod();
});
NestedQueryBuilder query = new NestedQueryBuilder("nested1", innerQueryBuilder, ScoreMode.None);
Expand All @@ -345,6 +348,7 @@ public void testParentFilterFromInlineLeafInnerHitsNestedQuery() throws Exceptio
assertThat(innerHitBuilders.size(), Matchers.equalTo(1));
assertTrue(innerHitBuilders.containsKey(leafInnerHits.getName()));
assertNull(queryShardContext.getParentFilter());
assertFalse(queryShardContext.isInnerHitQuery());
innerHitBuilders.get(leafInnerHits.getName()).build(searchContext, innerHitsContext);
assertNull(queryShardContext.getParentFilter());
verify(innerQueryBuilder).toQuery(queryShardContext);
Expand Down

0 comments on commit 0c33516

Please sign in to comment.