Skip to content

Commit

Permalink
Re-enabled multi-node integration tests and Fix integration test set up
Browse files Browse the repository at this point in the history
(opensearch-project#432)

* Wiping system indices, moving ml config check to individual tests that
create a connector

Signed-off-by: Joshua Palis <[email protected]>

* spotless

Signed-off-by: Joshua Palis <[email protected]>

* Increasing ML config index timeout for multi-node case

Signed-off-by: Joshua Palis <[email protected]>

* Including flow framework config in wipeAllODFEIndices method

Signed-off-by: Joshua Palis <[email protected]>

* exluding model group index from wipe

Signed-off-by: Joshua Palis <[email protected]>

* testing wipe all indices with client

Signed-off-by: Joshua Palis <[email protected]>

* Reverting wipe method back to adminClient

Signed-off-by: Joshua Palis <[email protected]>

* removing deprovision from model tests, not necessary since we're
wiping indices after each test run

Signed-off-by: Joshua Palis <[email protected]>

* Wrapping testFailedUpdateWorkflow provision in ml config check

Signed-off-by: Joshua Palis <[email protected]>

* Fixing failing security tests

Signed-off-by: Joshua Palis <[email protected]>

* Testing model redeploy settings

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
Co-authored-by: Owais Kazi <[email protected]>
  • Loading branch information
2 people authored and dbwiddis committed Jan 22, 2024
1 parent dbfaef8 commit fbe0475
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 87 deletions.
20 changes: 8 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -237,18 +237,6 @@ ext{
cluster.setting("plugins.security.authcz.admin_dn", "\n- CN=kirk,OU=client,O=client,L=test, C=de")
cluster.setting('plugins.security.restapi.roles_enabled', '["all_access", "security_rest_api_access"]')
cluster.setting('plugins.security.system_indices.enabled', "true")
cluster.setting('plugins.security.system_indices.indices', '[' +
'".plugins-ml-config", ' +
'".plugins-ml-connector", ' +
'".plugins-ml-model-group", ' +
'".plugins-ml-model", ".plugins-ml-task", ' +
'".plugins-ml-conversation-meta", ' +
'".plugins-ml-conversation-interactions", ' +
'".plugins-flow-framework-config", ' +
'".plugins-flow-framework-templates", ' +
'".plugins-flow-framework-state"' +
']'
)
cluster.setSecure(true)
}
}
Expand Down Expand Up @@ -423,6 +411,14 @@ task integTestRemote(type: RestIntegTestTask) {

// Automatically sets up the integration test cluster locally
run {
doFirst {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}

useCluster testClusters.integTest
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -70,67 +71,79 @@ public abstract class FlowFrameworkRestTestCase extends OpenSearchRestTestCase {
@Before
protected void setUpSettings() throws Exception {

if (!indexExistsWithAdminClient(".plugins-ml-config")) {
// Initial cluster set up

// Enable Flow Framework Plugin Rest APIs
Response response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"transient\":{\"plugins.flow_framework.enabled\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable ML Commons to run on non-ml nodes
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable local model registration via URL
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Need a delay here on 2.x or next line consistently fails tests.
// TODO: figure out know why we need this and we should pursue a better option that doesn't require HTTP5
Thread.sleep(10000);
// Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());
// Enable Flow Framework Plugin Rest APIs
Response response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"transient\":{\"plugins.flow_framework.enabled\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Ensure .plugins-ml-config is created before proceeding with integration tests
assertBusy(() -> { assertTrue(indexExistsWithAdminClient(".plugins-ml-config")); }, 60, TimeUnit.SECONDS);
}
// Enable ML Commons to run on non-ml nodes
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable local model registration via URL
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML auto redeploy to false
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.model_auto_redeploy.enable\":false}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set ML auto redeploy retries to 0
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.model_auto_redeploy.lifetime_retry_times\":0}}",
List.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Set up clients if running in security enabled cluster
if (isHttps()) {
String fullAccessUserPassword = generatePassword(FULL_ACCESS_USER);
String readAccessUserPassword = generatePassword(READ_ACCESS_USER);

// Configure full access user and client, needs ML Full Access role as well
Response response = createUser(
response = createUser(
FULL_ACCESS_USER,
fullAccessUserPassword,
List.of(FLOW_FRAMEWORK_FULL_ACCESS_ROLE, ML_COMMONS_FULL_ACCESS_ROLE)
Expand Down Expand Up @@ -185,7 +198,7 @@ protected static void deleteIndexWithAdminClient(String name) throws IOException

// Utility fn for checking if an index exists. Should only be used when not allowed in a regular context
// (e.g., checking existence of system indices)
protected static boolean indexExistsWithAdminClient(String indexName) throws IOException {
public static boolean indexExistsWithAdminClient(String indexName) throws IOException {
Request request = new Request("HEAD", "/" + indexName);
Response response = adminClient().performRequest(request);
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
Expand Down Expand Up @@ -247,6 +260,40 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s
}
}

@SuppressWarnings("unchecked")
@After
protected void wipeAllODFEIndices() throws IOException {
Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all"));
MediaType xContentType = MediaType.fromMediaType(response.getEntity().getContentType().getValue());
try (
XContentParser parser = xContentType.xContent()
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.getEntity().getContent()
)
) {
XContentParser.Token token = parser.nextToken();
List<Map<String, Object>> parserList = null;
if (token == XContentParser.Token.START_ARRAY) {
parserList = parser.listOrderedMap().stream().map(obj -> (Map<String, Object>) obj).collect(Collectors.toList());
} else {
parserList = Collections.singletonList(parser.mapOrdered());
}

for (Map<String, Object> index : parserList) {
String indexName = (String) index.get("index");
// Do not reset ML/Flow Framework encryption index as this is needed to encrypt connector credentials
if (indexName != null
&& !".opendistro_security".equals(indexName)
&& !".plugins-ml-config".equals(indexName)
&& !".plugins-flow-framework-config".equals(indexName)) {
adminClient().performRequest(new Request("DELETE", "/" + indexName));
}
}
}
}

/**
* wipeAllIndices won't work since it cannot delete security index. Use wipeAllSystemIndices instead.
*/
Expand Down Expand Up @@ -284,12 +331,13 @@ protected Response createWorkflow(RestClient client, Template template) throws E

/**
* Helper method to invoke the Create Workflow Rest Action with provision
* @param client the rest client
* @param template the template to create
* @throws Exception if the request fails
* @return a rest response
*/
protected Response createWorkflowWithProvision(Template template) throws Exception {
return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null);
protected Response createWorkflowWithProvision(RestClient client, Template template) throws Exception {
return TestHelpers.makeRequest(client, "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testFailedUpdateWorkflow() throws Exception {
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(responseCreate));

Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json");
Thread.sleep(1000);

ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template));
assertTrue(exception.getMessage().contains("Failed to get template: 123"));

Expand All @@ -84,8 +84,14 @@ public void testFailedUpdateWorkflow() throws Exception {
Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);

// Provision Template
Response provisionResponse = provisionWorkflow(client(), workflowId);
// Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status
Response provisionResponse;
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
provisionResponse = provisionWorkflow(client(), workflowId);
} else {
provisionResponse = provisionWorkflow(client(), workflowId);
}
assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse));
getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS);

Expand Down Expand Up @@ -163,12 +169,6 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(0).resourceId());
assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName());
assertNotNull(resourcesCreated.get(1).resourceId());

// Deprovision the workflow to avoid opening circut breaker when running additional tests
Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);

// wait for deprovision to complete
Thread.sleep(5000);
}

public void testCreateAndProvisionCyclicalTemplate() throws Exception {
Expand Down Expand Up @@ -215,8 +215,14 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
String workflowId = (String) responseMap.get(WORKFLOW_ID);
getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED);

// Hit Provision API and assert status
response = provisionWorkflow(client(), workflowId);
// Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
response = provisionWorkflow(client(), workflowId);
} else {
response = provisionWorkflow(client(), workflowId);
}

assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS);

Expand All @@ -231,19 +237,19 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(1).resourceId());
assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName());
assertNotNull(resourcesCreated.get(2).resourceId());

// Deprovision the workflow to avoid opening circuit breaker when running additional tests
Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);

// wait for deprovision to complete
Thread.sleep(5000);
}

public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Template template = TestHelpers.createTemplateFromFile("agent-framework.json");

// Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter
Response response = createWorkflowWithProvision(template);
Response response;
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
response = createWorkflowWithProvision(client(), template);
} else {
response = createWorkflowWithProvision(client(), template);
}
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;

Expand Down Expand Up @@ -116,7 +117,12 @@ public void testCreateProvisionDeprovisionWorkflowWithFullAccess() throws Except
assertEquals(RestStatus.OK, searchResponse.status());

// Invoke provision API
response = provisionWorkflow(fullAccessClient(), workflowId);
if (!indexExistsWithAdminClient(".plugins-ml-config")) {
assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS);
response = provisionWorkflow(fullAccessClient(), workflowId);
} else {
response = provisionWorkflow(fullAccessClient(), workflowId);
}
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));

// Invoke status API
Expand Down

0 comments on commit fbe0475

Please sign in to comment.