Skip to content

Commit

Permalink
add retry
Browse files Browse the repository at this point in the history
  • Loading branch information
will-hwang committed Jan 8, 2025
1 parent baa2929 commit 3066082
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import static org.opensearch.neuralsearch.util.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.neuralsearch.util.TestUtils.TEXT_IMAGE_EMBEDDING_PROCESSOR;
import static org.opensearch.neuralsearch.util.TestUtils.getModelId;

import org.opensearch.client.Response;
import org.opensearch.neuralsearch.query.NeuralQueryBuilder;

public class MultiModalSearchIT extends AbstractRollingUpgradeTestCase {
Expand All @@ -24,15 +26,23 @@ public class MultiModalSearchIT extends AbstractRollingUpgradeTestCase {
private static final String TEST_IMAGE_TEXT_UPGRADED = "/9j/4AAQSkZJR8eydhgfwceocvlk";

private static final int NUM_DOCS_PER_ROUND = 1;
private static String modelId = "";

// Test rolling-upgrade test image embedding processor
// Create Text Image Embedding Processor, Ingestion Pipeline and add document
// Validate process , pipeline and document count in rolling-upgrade scenario
public void testTextImageEmbeddingProcessor_E2EFlow() throws Exception {
waitForClusterHealthGreen(NODES_BWC_CLUSTER);
int count = 0;
int maxCount = 3;
Response response;
do {
response = waitForClusterHealthGreen(NODES_BWC_CLUSTER);
count++;
} while (count < maxCount && response.getStatusLine().getStatusCode() == 200 );

switch (getClusterType()) {
case OLD:
String modelId = uploadTextImageEmbeddingModel();
modelId = uploadTextImageEmbeddingModel();
loadModel(modelId);
createPipelineForTextImageProcessor(modelId, PIPELINE_NAME);
createIndexWithConfiguration(
Expand All @@ -43,27 +53,26 @@ public void testTextImageEmbeddingProcessor_E2EFlow() throws Exception {
addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT);
break;
case MIXED:
String modelId2 = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);
modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);
int totalDocsCountMixed;
if (isFirstMixedRound()) {
totalDocsCountMixed = NUM_DOCS_PER_ROUND;
validateTestIndexOnUpgrade(totalDocsCountMixed, modelId2, TEXT, TEST_IMAGE_TEXT);
validateTestIndexOnUpgrade(totalDocsCountMixed, modelId, TEXT, TEST_IMAGE_TEXT);
addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_MIXED, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_MIXED);
} else {
totalDocsCountMixed = 2 * NUM_DOCS_PER_ROUND;
validateTestIndexOnUpgrade(totalDocsCountMixed, modelId2, TEXT_MIXED, TEST_IMAGE_TEXT_MIXED);
validateTestIndexOnUpgrade(totalDocsCountMixed, modelId, TEXT_MIXED, TEST_IMAGE_TEXT_MIXED);
}
break;
case UPGRADED:
String modelId3 = null;
try {
modelId3 = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);
modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR);
int totalDocsCountUpgraded = 3 * NUM_DOCS_PER_ROUND;
loadModel(modelId3);
loadModel(modelId);
addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_UPGRADED, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_UPGRADED);
validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId3, TEXT_UPGRADED, TEST_IMAGE_TEXT_UPGRADED);
validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId, TEXT_UPGRADED, TEST_IMAGE_TEXT_UPGRADED);
} finally {
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId3, null);
wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1280,13 +1280,13 @@ protected String registerModelGroup(final String modelGroupRegisterRequestBody)
}

// Method that waits till the health of nodes in the cluster goes green
protected void waitForClusterHealthGreen(final String numOfNodes) throws IOException {
protected Response waitForClusterHealthGreen(final String numOfNodes) throws IOException {
Request waitForGreen = new Request("GET", "/_cluster/health");
waitForGreen.addParameter("wait_for_nodes", numOfNodes);
waitForGreen.addParameter("wait_for_status", "green");
waitForGreen.addParameter("cluster_manager_timeout", "60s");
waitForGreen.addParameter("timeout", "60s");
client().performRequest(waitForGreen);
return client().performRequest(waitForGreen);
}

/**
Expand Down

0 comments on commit 3066082

Please sign in to comment.