Skip to content

Commit

Permalink
Switch SdkClient to use default generic thread executor
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 15, 2025
1 parent 0583004 commit a5cbf25
Show file tree
Hide file tree
Showing 20 changed files with 141 additions and 103 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ dependencies {
version { strictly("${jacksonVersion}") }
}
// Multi-tenant SDK Client
implementation "org.opensearch:opensearch-remote-metadata-sdk:${opensearch_version}"
implementation "org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}"

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ public Collection<Object> createComponents(
Map.entry(TENANT_AWARE_KEY, "true"),
Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD)

Check warning on line 152 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L146-L152

Added lines #L146 - L152 were not covered by tests
)
: Collections.emptyMap()
: Collections.emptyMap(),
// TODO: Find a better thread pool or make one
client.threadPool().executor(ThreadPool.Names.GENERIC)
);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,13 @@
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.META;
import static org.opensearch.flowframework.common.CommonValue.NO_SCHEMA_VERSION;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.SCHEMA_VERSION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
Expand Down Expand Up @@ -359,7 +356,7 @@ private void putOrReplaceTemplateInGlobalContextIndex(String documentId, Templat
.dataObject(encryptorUtils.encryptTemplateCredentials(template))
.build();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
sdkClient.putDataObjectAsync(request, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
sdkClient.putDataObjectAsync(request).whenComplete((r, throwable) -> {
context.restore();

Check warning on line 360 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L352-L360

Added lines #L352 - L360 were not covered by tests
if (throwable == null) {
try {
Expand Down Expand Up @@ -426,24 +423,23 @@ public void putInitialStateToWorkflowState(String workflowId, String tenantId, U
.dataObject(state)
.build();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
sdkClient.putDataObjectAsync(putRequest, client.threadPool().executor(PROVISION_WORKFLOW_THREAD_POOL))
.whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
IndexResponse indexResponse = IndexResponse.fromXContent(r.parser());
listener.onResponse(indexResponse);
} catch (IOException e) {
logger.error("Failed to parse index response", e);
listener.onFailure(new FlowFrameworkException("Failed to parse index response", INTERNAL_SERVER_ERROR));
}
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = "Failed to put state index document";
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
sdkClient.putDataObjectAsync(putRequest).whenComplete((r, throwable) -> {
context.restore();

Check warning on line 427 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L419-L427

Added lines #L419 - L427 were not covered by tests
if (throwable == null) {
try {
IndexResponse indexResponse = IndexResponse.fromXContent(r.parser());
listener.onResponse(indexResponse);
} catch (IOException e) {
logger.error("Failed to parse index response", e);
listener.onFailure(new FlowFrameworkException("Failed to parse index response", INTERNAL_SERVER_ERROR));
}

Check warning on line 435 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L430-L435

Added lines #L430 - L435 were not covered by tests
});
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = "Failed to put state index document";
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));

Check warning on line 440 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L437-L440

Added lines #L437 - L440 were not covered by tests
}
});

Check warning on line 442 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L442

Added line #L442 was not covered by tests
}
}, e -> {
String errorMessage = "Failed to create workflow_state index";
Expand Down Expand Up @@ -552,7 +548,7 @@ public void getTemplate(String documentId, String tenantId, ActionListener<GetRe
.id(documentId)
.tenantId(tenantId)
.build();
sdkClient.getDataObjectAsync(getRequest, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
sdkClient.getDataObjectAsync(getRequest).whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
Expand Down Expand Up @@ -586,7 +582,7 @@ public void getWorkflowState(String workflowId, String tenantId, ActionListener<
.id(workflowId)
.tenantId(tenantId)
.build();
sdkClient.getDataObjectAsync(getRequest, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
sdkClient.getDataObjectAsync(getRequest).whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
Expand Down Expand Up @@ -799,31 +795,30 @@ public void deleteFlowFrameworkSystemIndexDoc(String documentId, String tenantId
.id(documentId)
.tenantId(tenantId)
.build();
sdkClient.deleteDataObjectAsync(deleteRequest, client.threadPool().executor(DEPROVISION_WORKFLOW_THREAD_POOL))
.whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
DeleteResponse response = DeleteResponse.fromXContent(r.parser());
logger.info("Deleted workflow state doc: {}", documentId);
listener.onResponse(response);
} catch (Exception e) {
logger.error("Failed to parse delete response", e);
listener.onFailure(
new FlowFrameworkException("Failed to parse delete response", RestStatus.INTERNAL_SERVER_ERROR)
);
}
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to delete {} entry : {}",
WORKFLOW_STATE_INDEX,
documentId
).getFormattedMessage();
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
sdkClient.deleteDataObjectAsync(deleteRequest).whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
DeleteResponse response = DeleteResponse.fromXContent(r.parser());
logger.info("Deleted workflow state doc: {}", documentId);
listener.onResponse(response);
} catch (Exception e) {
logger.error("Failed to parse delete response", e);
listener.onFailure(

Check warning on line 807 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L805-L807

Added lines #L805 - L807 were not covered by tests
new FlowFrameworkException("Failed to parse delete response", RestStatus.INTERNAL_SERVER_ERROR)
);
}
});
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to delete {} entry : {}",
WORKFLOW_STATE_INDEX,
documentId
).getFormattedMessage();
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
});
}
}
}
Expand Down Expand Up @@ -959,10 +954,7 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
.id(workflowId)
.tenantId(tenantId)
.build();
sdkClient.getDataObjectAsync(
getRequest,
client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL)
).whenComplete((r, throwable) -> {
sdkClient.getDataObjectAsync(getRequest).whenComplete((r, throwable) -> {
if (throwable == null) {
try {
GetResponse getResponse = GetResponse.fromXContent(r.parser());
Expand Down Expand Up @@ -1008,10 +1000,7 @@ private void handleStateGetResponse(
.ifSeqNo(getResponse.getSeqNo())
.ifPrimaryTerm(getResponse.getPrimaryTerm())
.build();
sdkClient.updateDataObjectAsync(
updateRequest,
client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL)
).whenComplete((r, throwable) -> {
sdkClient.updateDataObjectAsync(updateRequest).whenComplete((r, throwable) -> {
if (throwable == null) {
handleStateUpdateSuccess(workflowId, resource, operation, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
Expand Down Expand Up @@ -372,8 +371,7 @@ private void createExecute(WorkflowRequest request, User user, String tenantId,
logger.info("Querying existing workflow from global context: {}", workflowId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
sdkClient.getDataObjectAsync(
GetDataObjectRequest.builder().index(GLOBAL_CONTEXT_INDEX).id(workflowId).tenantId(tenantId).build(),
client.threadPool().executor(WORKFLOW_THREAD_POOL)
GetDataObjectRequest.builder().index(GLOBAL_CONTEXT_INDEX).id(workflowId).tenantId(tenantId).build()
).whenComplete((r, throwable) -> {
if (throwable == null) {
context.restore();
Expand Down Expand Up @@ -516,24 +514,23 @@ void checkMaxWorkflows(TimeValue requestTimeOut, Integer maxWorkflow, String ten
.tenantId(tenantId)
.build();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
sdkClient.searchDataObjectAsync(searchRequest, client.threadPool().executor(WORKFLOW_THREAD_POOL))
.whenComplete((r, throwable) -> {
if (throwable == null) {
context.restore();
try {
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
internalListener.onResponse(searchResponse.getHits().getTotalHits().value < maxWorkflow);
} catch (Exception e) {
logger.error("Failed to parse workflow searchResponse", e);
internalListener.onFailure(e);
}
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = "Unable to fetch the workflows";
logger.error(errorMessage, exception);
internalListener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
sdkClient.searchDataObjectAsync(searchRequest).whenComplete((r, throwable) -> {
if (throwable == null) {
context.restore();
try {
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
internalListener.onResponse(searchResponse.getHits().getTotalHits().value < maxWorkflow);
} catch (Exception e) {
logger.error("Failed to parse workflow searchResponse", e);
internalListener.onFailure(e);

Check warning on line 525 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L523-L525

Added lines #L523 - L525 were not covered by tests
}
});
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = "Unable to fetch the workflows";
logger.error(errorMessage, exception);
internalListener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));

Check warning on line 531 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L528-L531

Added lines #L528 - L531 were not covered by tests
}
});
} catch (Exception e) {
String errorMessage = "Unable to fetch the workflows";
logger.error(errorMessage, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;
Expand Down Expand Up @@ -143,7 +142,7 @@ private void executeDeleteRequest(
.id(workflowId)
.tenantId(tenantId)
.build();
sdkClient.deleteDataObjectAsync(deleteRequest, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
sdkClient.deleteDataObjectAsync(deleteRequest).whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Arrays;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.util.ParseUtils.isAdmin;
import static org.opensearch.flowframework.util.RestHandlerUtils.getSourceContext;

Expand Down Expand Up @@ -122,7 +121,7 @@ private void doSearch(SearchRequest request, String tenantId, ActionListener<Sea
.tenantId(tenantId)
.searchSourceBuilder(request.source())
.build();
sdkClient.searchDataObjectAsync(searchRequest, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
sdkClient.searchDataObjectAsync(searchRequest).whenComplete((r, throwable) -> {
if (throwable == null) {
try {
SearchResponse searchResponse = SearchResponse.fromXContent(r.parser());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.MASTER_KEY;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;

/**
* Encryption utility class
Expand Down Expand Up @@ -324,7 +323,7 @@ private void generateAndIndexNewMasterKey(String tenantId, ActionListener<Boolea
.dataObject(config)
.build();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
sdkClient.putDataObjectAsync(putRequest, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
sdkClient.putDataObjectAsync(putRequest).whenComplete((r, throwable) -> {
if (throwable == null) {
context.restore();
// Set generated key to master
Expand Down Expand Up @@ -372,8 +371,7 @@ private CompletableFuture<Void> cacheMasterKeyFromConfigIndex(String tenantId) {
.id(masterKeyId)
.tenantId(tenantId)
.fetchSourceContext(fetchSourceContext)
.build(),
client.threadPool().executor(WORKFLOW_THREAD_POOL)
.build()
).whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;

/**
* Utility methods for Template parsing
Expand Down Expand Up @@ -418,7 +417,7 @@ public static void getWorkflow(
.id(workflowId)
.tenantId(tenantId)
.build();
sdkClient.getDataObjectAsync(request, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
sdkClient.getDataObjectAsync(request).whenComplete((r, throwable) -> {
if (throwable == null) {
GetResponse getResponse;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void setUp() throws Exception {
when(client.admin()).thenReturn(adminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
threadPool = new TestThreadPool(FlowFrameworkPluginTests.class.getName());
when(client.threadPool()).thenReturn(threadPool);

environment = mock(Environment.class);
settings = Settings.builder().build();
Expand Down
Loading

0 comments on commit a5cbf25

Please sign in to comment.