Skip to content

Commit

Permalink
Update tests for new update async code
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 9, 2025
1 parent 13d1b83 commit 0583004
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,21 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(8, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessageFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.DocWriteRequest.OpType;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
Expand Down Expand Up @@ -47,7 +48,6 @@
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.remote.metadata.client.DeleteDataObjectRequest;
import org.opensearch.remote.metadata.client.GetDataObjectRequest;
import org.opensearch.remote.metadata.client.PutDataObjectRequest;
Expand Down Expand Up @@ -845,7 +845,7 @@ public void addResourceToStateIndex(
String resourceId,
ActionListener<WorkflowData> listener
) {
addResourceToStateIndex(currentNodeInputs, nodeId, workflowStepName, resourceId, null, listener);
addResourceToStateIndex(currentNodeInputs, nodeId, workflowStepName, resourceId, "fakeTenantId", listener);
}

/**
Expand Down Expand Up @@ -892,7 +892,7 @@ public void addResourceToStateIndex(
*/
@Deprecated
public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener<WorkflowData> listener) {
deleteResourceFromStateIndex(workflowId, null, resourceToDelete, listener);
deleteResourceFromStateIndex(workflowId, "fakeTenantId", resourceToDelete, listener);
}

/**
Expand Down Expand Up @@ -967,7 +967,7 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
try {
GetResponse getResponse = GetResponse.fromXContent(r.parser());
handleStateGetResponse(workflowId, tenantId, resource, operation, retries, listener, getResponse);
} catch (IOException e) {
} catch (Exception e) {
logger.error("Failed to parse get response", e);
listener.onFailure(new FlowFrameworkException("Failed to parse get response", INTERNAL_SERVER_ERROR));

Check warning on line 972 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L970-L972

Added lines #L970 - L972 were not covered by tests
}
Expand All @@ -991,17 +991,16 @@ private void handleStateGetResponse(
listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND));
return;
}
WorkflowState currentState;
try {
currentState = WorkflowState.parse(getResponse.getSourceAsString());
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
if (operation == OpType.DELETE) {
resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap()));
} else {
resourcesCreated.add(resource);
}
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
UpdateDataObjectRequest updateRequest2 = UpdateDataObjectRequest.builder()
UpdateDataObjectRequest updateRequest = UpdateDataObjectRequest.builder()
.index(WORKFLOW_STATE_INDEX)
.id(workflowId)
.tenantId(tenantId)
Expand All @@ -1010,7 +1009,7 @@ private void handleStateGetResponse(
.ifPrimaryTerm(getResponse.getPrimaryTerm())
.build();
sdkClient.updateDataObjectAsync(
updateRequest2,
updateRequest,
client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL)
).whenComplete((r, throwable) -> {
if (throwable == null) {
Expand Down Expand Up @@ -1059,7 +1058,7 @@ private void handleStateUpdateException(
ActionListener<WorkflowData> listener,
Exception e
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
if (e instanceof OpenSearchStatusException && ((OpenSearchStatusException) e).status() == RestStatus.CONFLICT && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, tenantId, newResource, operation, retries - 1, listener);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private void executeDeprovisionSequence(
deprovisionFuture.get();
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from state index resource list
flowFrameworkIndicesHandler.deleteResourceFromStateIndex(workflowId, resource, stateUpdateFuture);
flowFrameworkIndicesHandler.deleteResourceFromStateIndex(workflowId, tenantId, resource, stateUpdateFuture);
try {
// Wait at most 1 second for state index update.
stateUpdateFuture.actionGet(1, TimeUnit.SECONDS);
Expand Down
Loading

0 comments on commit 0583004

Please sign in to comment.