-
Notifications
You must be signed in to change notification settings - Fork 141
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Integrates KNN plugin with ConcurrentSearchRequestDecider interface
This allows knn queries to enable concurrency when index.search.concurrent_segment_search.mode or search.concurrent_segment_search.mode in auto mode. Without this the default behavior of auto mode is non-concurrent search Signed-off-by: Tejas Shah <[email protected]>
- Loading branch information
Showing
5 changed files
with
270 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.plugin.search; | ||
|
||
import lombok.EqualsAndHashCode; | ||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.query.QueryBuilder; | ||
import org.opensearch.knn.index.query.KNNQueryBuilder; | ||
import org.opensearch.search.deciders.ConcurrentSearchDecision; | ||
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; | ||
|
||
import java.util.Optional; | ||
|
||
/** | ||
* Decides if the knn query uses concurrent search | ||
* As of 2.17, this is only used when | ||
* - "index.search.concurrent_segment_search.mode": "auto" or | ||
* - "search.concurrent_segment_search.mode": "auto" | ||
* | ||
* Note: the class is not thread-safe and a new instance needs to be created for each request | ||
*/ | ||
@EqualsAndHashCode(callSuper = true) | ||
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider { | ||
|
||
private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision( | ||
ConcurrentSearchDecision.DecisionStatus.NO_OP, | ||
"Default decision" | ||
); | ||
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision( | ||
ConcurrentSearchDecision.DecisionStatus.YES, | ||
"Enable concurrent search for knn" | ||
); | ||
|
||
private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION; | ||
|
||
@Override | ||
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) { | ||
if (queryBuilder instanceof KNNQueryBuilder) { | ||
knnDecision = YES; | ||
} else { | ||
knnDecision = DEFAULT_KNN_DECISION; | ||
} | ||
} | ||
|
||
@Override | ||
public ConcurrentSearchDecision getConcurrentSearchDecision() { | ||
return knnDecision; | ||
} | ||
|
||
public static class Factory implements ConcurrentSearchRequestDecider.Factory { | ||
public Optional<ConcurrentSearchRequestDecider> create(IndexSettings indexSettings) { | ||
return Optional.of(new KNNConcurrentSearchRequestDecider()); | ||
} | ||
} | ||
} |
151 changes: 151 additions & 0 deletions
151
src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.integ.search; | ||
|
||
import com.google.common.primitives.Floats; | ||
import lombok.SneakyThrows; | ||
import org.apache.hc.core5.http.io.entity.EntityUtils; | ||
import org.junit.BeforeClass; | ||
import org.opensearch.client.Response; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.xcontent.XContentFactory; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.knn.KNNRestTestCase; | ||
import org.opensearch.knn.KNNResult; | ||
import org.opensearch.knn.TestUtils; | ||
import org.opensearch.knn.common.KNNConstants; | ||
import org.opensearch.knn.index.SpaceType; | ||
import org.opensearch.knn.index.engine.KNNEngine; | ||
import org.opensearch.knn.index.query.KNNQueryBuilder; | ||
import org.opensearch.knn.plugin.script.KNNScoringUtil; | ||
|
||
import java.io.IOException; | ||
import java.net.URL; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.TreeMap; | ||
|
||
import static org.opensearch.knn.common.KNNConstants.KNN_ENGINE; | ||
import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW; | ||
import static org.opensearch.knn.common.KNNConstants.METHOD_PARAMETER_SPACE_TYPE; | ||
import static org.opensearch.knn.common.KNNConstants.NAME; | ||
import static org.opensearch.knn.common.KNNConstants.PARAMETERS; | ||
|
||
/** | ||
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact | ||
* There is no latency verification as it can be better encapsulated in nightly runs. | ||
*/ | ||
public class ConcurrentSegmentSearchIT extends KNNRestTestCase { | ||
|
||
static TestUtils.TestData testData; | ||
|
||
@BeforeClass | ||
public static void setUpClass() throws IOException { | ||
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) { | ||
throw new IllegalStateException("ClassLoader of FaissIT Class is null"); | ||
} | ||
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json"); | ||
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv"); | ||
assert testIndexVectors != null; | ||
assert testQueries != null; | ||
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath()); | ||
} | ||
|
||
@SneakyThrows | ||
public void testConcurrentSegmentSearch() { | ||
String indexName = "test-concurrent-segment"; | ||
String fieldName = "test-field-1"; | ||
int dimension = testData.indexData.vectors[0].length; | ||
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension); | ||
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder); | ||
String mapping = indexBuilder.toString(); | ||
createKnnIndex(indexName, mapping); | ||
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName))); | ||
|
||
// Index the test data | ||
for (int i = 0; i < testData.indexData.docs.length; i++) { | ||
addKnnDoc( | ||
indexName, | ||
Integer.toString(testData.indexData.docs[i]), | ||
fieldName, | ||
Floats.asList(testData.indexData.vectors[i]).toArray() | ||
); | ||
} | ||
refreshAllNonSystemIndices(); | ||
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto")); | ||
|
||
// Test search queries | ||
int k = 10; | ||
verifySearch(indexName, fieldName, k); | ||
|
||
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto")); | ||
verifySearch(indexName, fieldName, k); | ||
} | ||
|
||
/* | ||
{ | ||
"properties": { | ||
"<fieldName>": { | ||
"type": "knn_vector", | ||
"dimension": <dimension>, | ||
"method": { | ||
"name": "hnsw", | ||
"space_type": "l2", | ||
"engine": "faiss", | ||
"parameters": { | ||
"m": 16, | ||
"ef_construction": 128, | ||
"ef_search": 128 | ||
} | ||
} | ||
} | ||
} | ||
*/ | ||
@SneakyThrows | ||
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) { | ||
// Create an index | ||
return XContentFactory.jsonBuilder() | ||
.startObject() | ||
.startObject("properties") | ||
.startObject(fieldName) | ||
.field("type", "knn_vector") | ||
.field("dimension", dimension) | ||
.startObject(KNNConstants.KNN_METHOD) | ||
.field(NAME, METHOD_HNSW) | ||
.field(METHOD_PARAMETER_SPACE_TYPE, SpaceType.L2.getValue()) | ||
.field(KNN_ENGINE, KNNEngine.FAISS.getName()) | ||
.startObject(PARAMETERS) | ||
.field(KNNConstants.METHOD_PARAMETER_M, 16) | ||
.field(KNNConstants.METHOD_PARAMETER_EF_CONSTRUCTION, 128) | ||
.field(KNNConstants.METHOD_PARAMETER_EF_SEARCH, 128) | ||
.endObject() | ||
.endObject() | ||
.endObject() | ||
.endObject() | ||
.endObject(); | ||
} | ||
|
||
@SneakyThrows | ||
private void verifySearch(String indexName, String fieldName, int k) { | ||
for (int i = 0; i < testData.queries.length; i++) { | ||
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build(); | ||
Response response = searchKNNIndex(indexName, queryBuilder, k); | ||
String responseBody = EntityUtils.toString(response.getEntity()); | ||
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName); | ||
assertEquals(k, knnResults.size()); | ||
|
||
List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName); | ||
for (int j = 0; j < k; j++) { | ||
float[] primitiveArray = knnResults.get(j).getVector(); | ||
assertEquals( | ||
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2), | ||
actualScores.get(j), | ||
0.0001 | ||
); | ||
} | ||
} | ||
} | ||
} |
53 changes: 53 additions & 0 deletions
53
src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.plugin.search; | ||
|
||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.query.MatchAllQueryBuilder; | ||
import org.opensearch.knn.index.query.KNNQueryBuilder; | ||
import org.opensearch.search.deciders.ConcurrentSearchDecision; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import static org.mockito.Mockito.mock; | ||
|
||
public class KNNConcurrentSearchRequestDeciderTests extends OpenSearchTestCase { | ||
|
||
public void testDecider() { | ||
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision"); | ||
|
||
KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider(); | ||
assertDecision(noop, decider.getConcurrentSearchDecision()); | ||
|
||
// Non KNNQueryBuilder | ||
decider.evaluateForQuery(new MatchAllQueryBuilder(), mock(IndexSettings.class)); | ||
assertDecision(noop, decider.getConcurrentSearchDecision()); | ||
|
||
decider.evaluateForQuery( | ||
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(), | ||
mock(IndexSettings.class) | ||
); | ||
ConcurrentSearchDecision yes = new ConcurrentSearchDecision( | ||
ConcurrentSearchDecision.DecisionStatus.YES, | ||
"Enable concurrent search for knn" | ||
); | ||
assertDecision(yes, decider.getConcurrentSearchDecision()); | ||
|
||
decider.evaluateForQuery(new MatchAllQueryBuilder(), mock(IndexSettings.class)); | ||
assertDecision(noop, decider.getConcurrentSearchDecision()); | ||
} | ||
|
||
public void testDeciderFactory() { | ||
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory(); | ||
IndexSettings indexSettings = mock(IndexSettings.class); | ||
assertNotSame(factory.create(indexSettings).get(), factory.create(indexSettings).get()); | ||
} | ||
|
||
private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) { | ||
assertEquals(expected.getDecisionReason(), actual.getDecisionReason()); | ||
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus()); | ||
} | ||
|
||
} |