Skip to content

Commit

Permalink
Add reindex integration tests (#1075)
Browse files Browse the repository at this point in the history
* Add reindex integration tests

Signed-off-by: Andy Qin <“[email protected]”>
  • Loading branch information
q-andy authored Jan 8, 2025
1 parent fea0a7f commit 972acb6
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 183 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Infrastructure
### Documentation
### Maintenance
- Add reindex integration tests for ingest processors ([#1075](https://github.com/opensearch-project/neural-search/pull/1075))
### Refactoring
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@
*/
package org.opensearch.neuralsearch.processor;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.Before;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.BaseNeuralSearchIT;

import com.google.common.collect.ImmutableList;
import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;

public class SparseEncodingProcessIT extends BaseNeuralSearchIT {
Expand All @@ -26,6 +17,20 @@ public class SparseEncodingProcessIT extends BaseNeuralSearchIT {

private static final String PIPELINE_NAME = "pipeline-sparse-encoding";

private static final String INGEST_DOCUMENT = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";

@Before
public void setUp() throws Exception {
super.setUp();
Expand All @@ -37,8 +42,8 @@ public void testSparseEncodingProcessor() throws Exception {
try {
modelId = prepareSparseEncodingModel();
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING);
createSparseEncodingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
ingestDocument(INDEX_NAME, INGEST_DOCUMENT);
assertEquals(1, getDocCount(INDEX_NAME));

NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder();
Expand All @@ -58,8 +63,8 @@ public void testSparseEncodingProcessorWithPrune() throws Exception {
try {
modelId = prepareSparseEncodingModel();
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING_PRUNE);
createSparseEncodingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
ingestDocument(INDEX_NAME, INGEST_DOCUMENT);
assertEquals(1, getDocCount(INDEX_NAME));

NeuralSparseQueryBuilder neuralSparseQueryBuilder = new NeuralSparseQueryBuilder();
Expand All @@ -74,42 +79,22 @@ public void testSparseEncodingProcessorWithPrune() throws Exception {
}
}

private void createSparseEncodingIndex() throws Exception {
createIndexWithConfiguration(
INDEX_NAME,
Files.readString(Path.of(classLoader.getResource("processor/SparseEncodingIndexMappings.json").toURI())),
PIPELINE_NAME
);
}

private void ingestDocument() throws Exception {
String ingestDocument = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
public void testSparseEncodingProcessorWithReindex() throws Exception {
// create a simple index and indexing data into this index.
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }");
// create text embedding index for reindex
String modelId = null;
try {
modelId = prepareSparseEncodingModel();
String toIndexName = "test-reindex-to";
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING);
createIndexWithPipeline(toIndexName, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
reindex(fromIndexName, toIndexName);
assertEquals(1, getDocCount(toIndexName));
} finally {
wipeOfTestResources(fromIndexName, PIPELINE_NAME, modelId, null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
*/
package org.opensearch.neuralsearch.processor;

import com.google.common.collect.ImmutableList;
import lombok.SneakyThrows;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.Before;

import java.net.URL;
Expand All @@ -19,9 +15,6 @@
import java.util.Map;
import java.util.Objects;

import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.neuralsearch.BaseNeuralSearchIT;

Expand Down Expand Up @@ -73,7 +66,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardToken
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked. The document ");
Expand All @@ -90,7 +85,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmLetterTokeniz
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_LETTER_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_LETTER_TOKENIZER_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked. The document ");
Expand All @@ -107,7 +104,9 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmLowercaseToke
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_LOWERCASE_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_LOWERCASE_TOKENIZER_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked. The document ");
Expand All @@ -124,7 +123,10 @@ public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardToken
try {
createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
Exception exception = assertThrows(Exception.class, () -> ingestDocument(TEST_LONG_DOCUMENT));
Exception exception = assertThrows(Exception.class, () -> {
String document = getDocumentFromFilePath(TEST_LONG_DOCUMENT);
ingestDocument(INDEX_NAME, document);
});
// max_token_count is 100 by index settings
assert (exception.getMessage()
.contains("The number of tokens produced by calling _analyze has exceeded the allowed maximum of [100]."));
Expand All @@ -139,7 +141,9 @@ public void testTextChunkingProcessor_withDelimiterAlgorithm_successful() {
try {
createPipelineProcessor(DELIMITER_PIPELINE_NAME);
createTextChunkingIndex(INDEX_NAME, DELIMITER_PIPELINE_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked.");
Expand All @@ -157,7 +161,9 @@ public void testTextChunkingProcessor_withCascadePipeline_successful() {
try {
createPipelineProcessor(CASCADE_PIPELINE_NAME);
createTextChunkingIndex(INDEX_NAME, CASCADE_PIPELINE_NAME);
ingestDocument(TEST_DOCUMENT);

String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(INDEX_NAME, document);

List<String> expectedPassages = new ArrayList<>();
expectedPassages.add("This is an example document to be chunked.");
Expand All @@ -176,6 +182,23 @@ public void testTextChunkingProcessor_withCascadePipeline_successful() {
}
}

public void testTextChunkingProcessor_withFixedTokenLengthAlgorithmStandardTokenizer_whenReindexingDocument_thenSuccessful()
throws Exception {
try {
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
String document = getDocumentFromFilePath(TEST_DOCUMENT);
ingestDocument(fromIndexName, document);

createPipelineProcessor(FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
createTextChunkingIndex(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME);
reindex(fromIndexName, INDEX_NAME);
assertEquals(1, getDocCount(INDEX_NAME));
} finally {
wipeOfTestResources(INDEX_NAME, FIXED_TOKEN_LENGTH_PIPELINE_WITH_STANDARD_TOKENIZER_NAME, null, null);
}
}

private void validateIndexIngestResults(String indexName, String fieldName, Object expected) {
assertEquals(1, getDocCount(indexName));
MatchAllQueryBuilder query = new MatchAllQueryBuilder();
Expand Down Expand Up @@ -205,23 +228,9 @@ private void createTextChunkingIndex(String indexName, String pipelineName) thro
createIndexWithConfiguration(indexName, Files.readString(Path.of(indexSettingsURLPath.toURI())), pipelineName);
}

private void ingestDocument(String documentPath) throws Exception {
URL documentURLPath = classLoader.getResource(documentPath);
private String getDocumentFromFilePath(String filePath) throws Exception {
URL documentURLPath = classLoader.getResource(filePath);
Objects.requireNonNull(documentURLPath);
String document = Files.readString(Path.of(documentURLPath.toURI()));
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(document),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
return Files.readString(Path.of(documentURLPath.toURI()));
}
}
Loading

0 comments on commit 972acb6

Please sign in to comment.