Skip to content

Commit

Permalink
Replace fakeTenantId placeholders with actual tenant id
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 23, 2025
1 parent a325ad4 commit 72e3076
Show file tree
Hide file tree
Showing 29 changed files with 139 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -876,26 +876,6 @@ public void deleteFlowFrameworkSystemIndexDoc(String documentId, String tenantId
}
}

/**
* Adds a resource to the state index, including common exception handling
* @param currentNodeInputs Inputs to the current node
* @param nodeId current process node (workflow step) id
* @param workflowStepName the workflow step name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @deprecated here temporarily until tenantID passed https://github.com/opensearch-project/flow-framework/issues/987
*/
@Deprecated
public void addResourceToStateIndex(
WorkflowData currentNodeInputs,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<WorkflowData> listener
) {
addResourceToStateIndex(currentNodeInputs, nodeId, workflowStepName, resourceId, "fakeTenantId", listener);
}

/**
* Adds a resource to the state index, including common exception handling
* @param currentNodeInputs Inputs to the current node
Expand Down Expand Up @@ -931,18 +911,6 @@ public void addResourceToStateIndex(
}
}

/**
* Removes a resource from the state index, including common exception handling
* @param workflowId The workflow document id in the state index
* @param resourceToDelete The resource to delete
* @param listener the ActionListener for this step to handle completing the future after update
* @deprecated here temporarily until tenantID passed https://github.com/opensearch-project/flow-framework/issues/987
*/
@Deprecated
public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener<WorkflowData> listener) {
deleteResourceFromStateIndex(workflowId, "fakeTenantId", resourceToDelete, listener);
}

/**
* Removes a resource from the state index, including common exception handling
* @param workflowId The workflow document id in the state index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,12 @@ void checkMaxWorkflows(TimeValue requestTimeOut, Integer maxWorkflow, String ten

private void validateWorkflows(Template template) throws Exception {
for (Workflow workflow : template.workflows().values()) {
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(workflow, null, Collections.emptyMap(), "fakeTenantId");
List<ProcessNode> sortedNodes = workflowProcessSorter.sortProcessNodes(
workflow,
null,
Collections.emptyMap(),
template.getTenantId()
);
workflowProcessSorter.validate(sortedNodes, pluginsService);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void executeDeprovisionSequence(
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
flowFrameworkSettings.getRequestTimeout(),
"fakeTenantId"
tenantId
)
);
}
Expand Down Expand Up @@ -291,7 +291,7 @@ private void executeDeprovisionSequence(
this.threadPool,
DEPROVISION_WORKFLOW_THREAD_POOL,
pn.nodeTimeout(),
"fakeTenantId"
tenantId
);
}).collect(Collectors.toList());
// Pause briefly before next loop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private void executeProvisionRequest(
provisionWorkflow,
workflowId,
request.getParams(),
"fakeTenantId"
tenantId
);
workflowProcessSorter.validate(provisionProcessSequence, pluginsService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void executeReprovisionRequest(
provisionWorkflow,
request.getWorkflowId(),
Collections.emptyMap(), // TODO : Add suport to reprovision substitution templates
"fakeTenantId"
tenantId
);

try {
Expand All @@ -230,7 +230,8 @@ private void executeReprovisionRequest(
workflowId,
originalTemplate,
updatedTemplate,
resourceCreated
resourceCreated,
tenantId
);

// Remove error field if any prior to subsequent execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
currentNodeId,
getName(),
pipelineId,
tenantId,
createPipelineFuture
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public PlainActionFuture<WorkflowData> execute(
registerLocalModelFuture,
taskId,
"Local model registration",
tenantId,
ActionListener.wrap(mlTaskWorkflowData -> {
// Registered Model Resource has been updated
String resourceName = getResourceByWorkflowStep(getName());
Expand All @@ -219,6 +220,7 @@ public PlainActionFuture<WorkflowData> execute(
currentNodeId,
DeployModelStep.NAME,
id,
tenantId,
deployUpdateListener
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ protected AbstractRetryableWorkflowStep(
* @param future the workflow step future
* @param taskId the ml task id
* @param workflowStep the workflow step which requires a retry get ml task functionality
* @param tenantId the tenant ID
* @param mlTaskListener the ML Task Listener
*/
protected void retryableGetMlTask(
Expand All @@ -76,6 +77,7 @@ protected void retryableGetMlTask(
PlainActionFuture<WorkflowData> future,
String taskId,
String workflowStep,
String tenantId,
ActionListener<WorkflowData> mlTaskListener
) {
CompletableFuture.runAsync(() -> {
Expand All @@ -91,7 +93,14 @@ protected void retryableGetMlTask(
content.put(REGISTER_MODEL_STATUS, response.getState().toString());
mlTaskListener.onResponse(new WorkflowData(content, r.getWorkflowId(), r.getNodeId()));
}, mlTaskListener::onFailure);
flowFrameworkIndicesHandler.addResourceToStateIndex(currentNodeInputs, nodeId, getName(), id, resourceListener);
flowFrameworkIndicesHandler.addResourceToStateIndex(
currentNodeInputs,
nodeId,
getName(),
id,
tenantId,
resourceListener
);
break;
case FAILED:
case COMPLETED_WITH_ERROR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
currentNodeId,
getName(),
mlCreateConnectorResponse.getConnectorId(),
tenantId,
createConnectorFuture
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public PlainActionFuture<WorkflowData> execute(
currentNodeId,
getName(),
indexName,
tenantId,
createIndexFuture
);
}, ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
deployModelFuture,
taskId,
"Deploy model",
tenantId,
ActionListener.wrap(
deployModelFuture::onResponse,
e -> deployModelFuture.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
currentNodeId,
getName(),
mlRegisterAgentResponse.getAgentId(),
tenantId,
registerAgentModelFuture
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
currentNodeId,
getName(),
mlRegisterModelGroupResponse.getModelGroupId(),
tenantId,
resourceListener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void onResponse(MLRegisterModelResponse mlRegisterModelResponse) {
currentNodeId,
getName(),
mlRegisterModelResponse.getModelId(),
tenantId,
registerUpdateListener
);
}
Expand All @@ -184,6 +185,7 @@ private void updateDeployResource(String resourceName, MLRegisterModelResponse m
currentNodeId,
DeployModelStep.NAME,
mlRegisterModelResponse.getModelId(),
tenantId,
deployUpdateListener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,16 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId,
* @param originalTemplate the original template currently indexed
* @param updatedTemplate the updated template to be executed
* @param resourcesCreated the resources previously created for the workflow
* @param tenantId the tenant id
* @throws Exception for issues creating the reprovision sequence
* @return A list of ProcessNode
*/
public List<ProcessNode> createReprovisionSequence(
String workflowId,
Template originalTemplate,
Template updatedTemplate,
List<ResourceCreated> resourcesCreated
List<ResourceCreated> resourcesCreated,
String tenantId
) throws Exception {

Workflow updatedWorkflow = updatedTemplate.workflows().get(PROVISION_WORKFLOW);
Expand Down Expand Up @@ -205,7 +207,8 @@ public List<ProcessNode> createReprovisionSequence(
updatedWorkflow,
sortedUpdatedNodes,
originalTemplateMap,
resourcesCreated
resourcesCreated,
tenantId
);

// If the reprovision sequence consists entirely of WorkflowDataSteps, then no modifications were made to the exisiting template.
Expand All @@ -223,6 +226,7 @@ public List<ProcessNode> createReprovisionSequence(
* @param sortedUpdatedNodes the topologically sorted updated template nodes
* @param originalTemplateMap a map of node Id to workflow node of the original template
* @param resourcesCreated a list of resources created for this template
* @param tenantId the tenant id
* @return a list of process node representing the reprovision sequence
* @throws Exception for issues creating the reprovision sequence
*/
Expand All @@ -231,7 +235,8 @@ private List<ProcessNode> createReprovisionSequence(
Workflow updatedWorkflow,
List<WorkflowNode> sortedUpdatedNodes,
Map<String, WorkflowNode> originalTemplateMap,
List<ResourceCreated> resourcesCreated
List<ResourceCreated> resourcesCreated,
String tenantId
) throws Exception {
Map<String, ProcessNode> idToNodeMap = new HashMap<>();
List<ProcessNode> reprovisionSequence = new ArrayList<>();
Expand All @@ -243,7 +248,8 @@ private List<ProcessNode> createReprovisionSequence(
originalTemplateMap,
resourcesCreated,
workflowId,
idToNodeMap
idToNodeMap,
tenantId
);
if (processNode != null) {
idToNodeMap.put(processNode.id(), processNode);
Expand All @@ -262,6 +268,7 @@ private List<ProcessNode> createReprovisionSequence(
* @param resourcesCreated a list of resources created for this template
* @param workflowId the workflow ID associated with the template
* @param idToNodeMap a map of the current reprovision sequence
* @param tenantId the tenant id
* @return a ProcessNode
* @throws Exception for issues creating the process node
*/
Expand All @@ -271,7 +278,8 @@ private ProcessNode createProcessNode(
Map<String, WorkflowNode> originalTemplateMap,
List<ResourceCreated> resourcesCreated,
String workflowId,
Map<String, ProcessNode> idToNodeMap
Map<String, ProcessNode> idToNodeMap,
String tenantId
) throws Exception {
WorkflowData data = new WorkflowData(node.userInputs(), updatedWorkflow.userParams(), workflowId, node.id());
List<ProcessNode> predecessorNodes = updatedWorkflow.edges()
Expand All @@ -284,15 +292,15 @@ private ProcessNode createProcessNode(

if (!originalTemplateMap.containsKey(node.id())) {
// Case 1: Additive modification, create new node
return createNewProcessNode(node, data, predecessorNodes, nodeTimeout);
return createNewProcessNode(node, data, predecessorNodes, nodeTimeout, tenantId);
} else {
WorkflowNode originalNode = originalTemplateMap.get(node.id());
if (shouldUpdateNode(node, originalNode)) {
// Case 2: Existing modification, create update step
return createUpdateProcessNode(node, data, predecessorNodes, nodeTimeout);
return createUpdateProcessNode(node, data, predecessorNodes, nodeTimeout, tenantId);
} else {
// Case 4: No modification to existing node, create proxy step
return createWorkflowDataStepNode(node, data, predecessorNodes, nodeTimeout, resourcesCreated);
return createWorkflowDataStepNode(node, data, predecessorNodes, nodeTimeout, resourcesCreated, tenantId);
}
}
}
Expand All @@ -303,13 +311,15 @@ private ProcessNode createProcessNode(
* @param data the current node data
* @param predecessorNodes the current node predecessors
* @param nodeTimeout the current node timeout
* @param tenantId the tenant id
* @return a Process Node
*/
private ProcessNode createNewProcessNode(
WorkflowNode node,
WorkflowData data,
List<ProcessNode> predecessorNodes,
TimeValue nodeTimeout
TimeValue nodeTimeout,
String tenantId
) {
WorkflowStep step = workflowStepFactory.createStep(node.type());
return new ProcessNode(
Expand All @@ -322,7 +332,7 @@ private ProcessNode createNewProcessNode(
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout,
"fakeTenantId"
tenantId
);
}

Expand All @@ -332,14 +342,16 @@ private ProcessNode createNewProcessNode(
* @param data the current node data
* @param predecessorNodes the current node predecessors
* @param nodeTimeout the current node timeout
* @param tenantId the tenant id
* @return a ProcessNode
* @throws FlowFrameworkException if the current node does not support updates
*/
private ProcessNode createUpdateProcessNode(
WorkflowNode node,
WorkflowData data,
List<ProcessNode> predecessorNodes,
TimeValue nodeTimeout
TimeValue nodeTimeout,
String tenantId
) throws FlowFrameworkException {
String updateStepName = WorkflowResources.getUpdateStepByWorkflowStep(node.type());
if (updateStepName != null) {
Expand All @@ -354,7 +366,7 @@ private ProcessNode createUpdateProcessNode(
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout,
"fakeTenantId"
tenantId
);
} else {
// Case 3 : Cannot update step (not supported)
Expand All @@ -372,14 +384,16 @@ private ProcessNode createUpdateProcessNode(
* @param predecessorNodes the current node predecessors
* @param nodeTimeout the current node timeout
* @param resourcesCreated the list of resources created for the template assoicated with this node
* @param tenantId the tenant id
* @return a Process node
*/
private ProcessNode createWorkflowDataStepNode(
WorkflowNode node,
WorkflowData data,
List<ProcessNode> predecessorNodes,
TimeValue nodeTimeout,
List<ResourceCreated> resourcesCreated
List<ResourceCreated> resourcesCreated,
String tenantId
) {
ResourceCreated nodeResource = resourcesCreated.stream()
.filter(rc -> rc.workflowStepId().equals(node.id()))
Expand All @@ -397,7 +411,7 @@ private ProcessNode createWorkflowDataStepNode(
threadPool,
PROVISION_WORKFLOW_THREAD_POOL,
nodeTimeout,
"fakeTenantId"
tenantId
);
} else {
return null;
Expand Down
Loading

0 comments on commit 72e3076

Please sign in to comment.