Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support get batch transform job status in get task API #2825

Merged
merged 5 commits into from
Sep 5, 2024

Conversation

rbhavna
Copy link
Collaborator

@rbhavna rbhavna commented Aug 13, 2024

Description

support get batch transform job status in get task API

//TODO: Will be adding UTs and ITs in next revision

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Comment on lines 192 to 194
CANCEL_BATCH,
BATCH_STATUS;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest rename these two action type to CANCEL_BATCH_PREDICT and BATCH_PREDICT_STATUS for sake of consistency and extensibility. It's possible that other batch action like batch_ingest will be added in the future so only using batch may look confused.

}

private void processRemoteBatchPrediction(MLTask mlTask, String taskId, ActionListener<MLTaskGetResponse> actionListener) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this empty line seems redundant.

@@ -275,6 +314,9 @@ public static MLTask parse(XContentParser parser) throws IOException {
case IS_ASYNC_TASK_FIELD:
async = parser.booleanValue();
break;
case TRANSFORM_JOB_FIELD:
transformJob = parser.map();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

later in the transport action. It's required all the values in this transformJob map is a String. Why not use parser.mapString() directly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the values of transformJob are objects, not strings

Comment on lines 150 to 159
Map<String, String> parameters = new HashMap<>();
for (Map.Entry<String, ?> entry : transformJob.entrySet()) {
if (entry.getValue() instanceof String) {
parameters.put(entry.getKey(), (String) entry.getValue());
} else {
log.debug("Value for key " + entry.getKey() + " is not a String");
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This component can be moved to the StringUtils class for better sharing and reusing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will do it

.getDataAsMap();
if (dataAsMap != null
&& (dataAsMap.containsKey("TransformJobArn") || dataAsMap.containsKey("id"))) {
transformJob.putAll(dataAsMap);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just convert all the Object to String for easier data handling in the Get Task action, without any loss.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Let me try to make that change and verify

Comment on lines 173 to 175
ActionListener<Connector> listener = ActionListener.wrap(connector -> {
connector.decrypt(BATCH_STATUS.name(), (credential) -> encryptor.decrypt(credential));
RemoteConnectorExecutor connectorExecutor = MLEngineClassLoader
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is connector access controller included? If it's just one line change, I think we should still add this access check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean to check the user's access to the connector

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should then also add model access controller I guess

@Zhangxunmt
Copy link
Collaborator

This would need the new action type "task status" in the connector actions body. Can you pls add one example in the test doc or this PR for this new action type? Also UTs are still missing for the GetTask Transport action.

@@ -45,6 +51,8 @@ public class MLTask implements ToXContentObject, Writeable {
public static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
public static final String ERROR_FIELD = "error";
public static final String IS_ASYNC_TASK_FIELD = "is_async";
public static final String TRANSFORM_JOB_FIELD = "transform_job";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transform_job seems only limited to Sagemaker, how about changing to remote_job? cc @Zhangxunmt

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"remote job" sounds good to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure makes sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about remote_batch_job or batch_job

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not all remote jobs are for batching purpose

out.writeBoolean(true);
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
out.writeString(gson.toJson(transformJob));
Copy link
Collaborator

@ylwu-amzn ylwu-amzn Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use out.writeMap(..) ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output.writeMap(transformJob, StreamOutput::writeString, StreamOutput::writeGenericValue)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Map<String, Object> type? I am trying to change transformJob's type to Map<String, String> but getting some exceptions while testing

@Zhangxunmt Zhangxunmt mentioned this pull request Aug 30, 2024
5 tasks
}
}

if (parameters.containsKey("TransformJobArn") && parameters.get("TransformJobArn") != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments?
I think TransformJobArn and TransformJobArn only for Sagemaker.

});
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
connectorAccessControlHelper
.getConnector(client, model.getConnectorId(), ActionListener.runBefore(listener, threadContext::restore));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also consider internal connector in model? There will be no connector id

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch. Let me add that case too

updatedTask.put(TRANSFORM_JOB_FIELD, transformJob);

if ((transformJob.containsKey("status") && transformJob.get("status").equals("completed"))
|| (transformJob.containsKey("TransformJobStatus")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such parsing logic looks only limited to Sagemaker, right ? Does this PR only support Sagemaker ? Suggest add some comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its both sagemaker and OpenAI. Will add more comments

@rbhavna rbhavna force-pushed the batch_task_status branch from 2e6186b to 2e616f0 Compare August 31, 2024 00:35
@rbhavna rbhavna had a problem deploying to ml-commons-cicd-env August 31, 2024 00:35 — with GitHub Actions Failure
@rbhavna rbhavna had a problem deploying to ml-commons-cicd-env August 31, 2024 00:36 — with GitHub Actions Failure
@rbhavna rbhavna force-pushed the batch_task_status branch from 2e616f0 to b6be1f5 Compare August 31, 2024 00:39
@rbhavna rbhavna had a problem deploying to ml-commons-cicd-env August 31, 2024 00:39 — with GitHub Actions Failure
@rbhavna rbhavna had a problem deploying to ml-commons-cicd-env August 31, 2024 00:39 — with GitHub Actions Failure
Comment on lines 173 to 175
if (streamOutputVersion.onOrAfter(MLTask.MINIMAL_SUPPORTED_VERSION_FOR_BATCH_TRANSFORM_JOB)) {
if (remoteJob != null) {
out.writeBoolean(true);
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
out.writeString(gson.toJson(remoteJob));
return null;
});
} catch (PrivilegedActionException e) {
throw new RuntimeException(e);
}
} else {
out.writeBoolean(false);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use these two lines to replace this code block.
out.writeBoolean(true);
out.writeMap(remoteJob, StreamOutput::writeString, StreamOutput::writeGenericValue);

Comment on lines 140 to 138
if (input.readBoolean()) {
String mapStr = input.readString();
this.remoteJob = gson.fromJson(mapStr, Map.class);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.remoteJob = input.readMap(s -> s.readString(), s -> s.readGenericValue());
Why not use this for simplicity?

Comment on lines +146 to +156
Map<String, String> parameters = new HashMap<>();
for (Map.Entry<String, ?> entry : remoteJob.entrySet()) {
if (entry.getValue() instanceof String) {
parameters.put(entry.getKey(), (String) entry.getValue());
} else {
log.debug("Value for key " + entry.getKey() + " is not a String");
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the remoteJob has to be <String, String>? If that's the case, why not throw exception when this remoteJob is created inside MLTask in the first place?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it seems easier when remote job is Map<String, String> while creating itself. Will make that change to avoid all this confusion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When converting to Map<String, String>, the values that are not strings will be lost. User may not be able to see complete response from remote job. I guess it is safe to use Map<String, Object> so that all key-value pairs from remote service are retained

Comment on lines 156 to 161
if (parameters.containsKey("TransformJobArn") && parameters.get("TransformJobArn") != null) {
String jobArn = parameters.get("TransformJobArn");
String transformJobName = jobArn.substring(jobArn.lastIndexOf("/") + 1);
parameters.put("TransformJobName", transformJobName);
parameters.remove("TransformJobArn");
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parameters.put("TransformJobName", Optional.ofNullable(parameters.remove("TransformJobArn"))
.map(jobArn -> jobArn.substring(jobArn.lastIndexOf("/") + 1))
.orElse(null));

This block is used twice in the PR. Consider replacing it with this single statement to avoid code duplication.

actionListener.onResponse(new MLCancelBatchJobResponse(RestStatus.OK));
} else {
log.debug("The status code from remote service is: " + modelOutput.getStatusCode());
actionListener.onFailure(new ResourceNotFoundException("Couldn't cancel the transform job. Please try again"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use ResourceNotFoundException here? It seems like that it can be any Exception from remote service. So we can just actionListener.onFailure(new OpenSearchException(modelOutput.getStatusCode()));

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense

Map<String, Object> updatedTask = new HashMap<>();
updatedTask.put(REMOTE_JOB_FIELD, remoteJob);

if ((remoteJob.containsKey("status") && remoteJob.get("status").equals("completed"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the key-value pair in remoteJobStatus from remote service is not "status" -> "completed" || "Completed", but some other names or status?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it can be other values like in progress, cancelled, etc

Zhangxunmt
Zhangxunmt previously approved these changes Sep 5, 2024
@ylwu-amzn
Copy link
Collaborator

* What went wrong:
Execution failed for task ':opensearch-ml-plugin:spotlessJavaCheck'.
> The following files had format violations:
      src/main/java/org/opensearch/ml/action/tasks/GetTaskTransportAction.java
          @@ -13,7 +13,6 @@
           import?static?org.opensearch.ml.common.MLTaskState.CANCELLED;
           import?static?org.opensearch.ml.common.MLTaskState.COMPLETED;
           import?static?org.opensearch.ml.common.connector.ConnectorAction.ActionType.BATCH_PREDICT_STATUS;
          -import?static?org.opensearch.ml.common.conversation.ConversationalIndexConstants.META_INDEX_NAME;
           import?static?org.opensearch.ml.utils.MLExceptionUtils.logException;
           import?static?org.opensearch.ml.utils.MLNodeUtils.createXContentParserFromRegistry;
           
          @@ -23,8 +22,6 @@
           
           import?org.opensearch.OpenSearchException;
           import?org.opensearch.OpenSearchStatusException;
          -import?org.opensearch.OpenSearchWrapperException;
          -import?org.opensearch.ResourceAlreadyExistsException;
           import?org.opensearch.ResourceNotFoundException;
           import?org.opensearch.action.ActionRequest;
           import?org.opensearch.action.get.GetRequest;
  Run './gradlew :opensearch-ml-plugin:spotlessApply' to fix these violations.

@ylwu-amzn
Copy link
Collaborator

REPRODUCE WITH: gradlew ':opensearch-ml-plugin:test' --tests "org.opensearch.ml.action.model_group.UpdateModelGroupITTests.test_update_public_model_group" -Dtests.seed=ACF849029908FA76 -Dtests.security.manager=false -Dtests.locale=ar-PS -Dtests.timezone=Asia/Taipei -Druntime.java=21
UpdateModelGroupITTests > test_update_public_model_group FAILED
    java.lang.AssertionError: 
    Expected: an instance of java.lang.IllegalArgumentException
         but: <org.junit.internal.runners.model.MultipleFailureException: There were ? errors:
      java.lang.IllegalArgumentException(You cannot specify model access control parameters because the Security plugin or model access control is disabled on your cluster.)

@rbhavna rbhavna temporarily deployed to ml-commons-cicd-env September 5, 2024 02:15 — with GitHub Actions Inactive
@rbhavna rbhavna temporarily deployed to ml-commons-cicd-env September 5, 2024 14:31 — with GitHub Actions Inactive
@rbhavna rbhavna temporarily deployed to ml-commons-cicd-env September 5, 2024 16:24 — with GitHub Actions Inactive
@rbhavna rbhavna had a problem deploying to ml-commons-cicd-env September 5, 2024 16:24 — with GitHub Actions Failure
@Zhangxunmt Zhangxunmt merged commit 8da7bd2 into opensearch-project:main Sep 5, 2024
3 of 5 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Sep 5, 2024
* support get batch transform job status in get task API

Signed-off-by: Bhavana Ramaram <[email protected]>

* add cancel batch prediction job API for offline inference

Signed-off-by: Bhavana Ramaram <[email protected]>

* add unit tests and address comments

Signed-off-by: Bhavana Ramaram <[email protected]>

* stash context for get model

Signed-off-by: Bhavana Ramaram <[email protected]>

* apply spotlessJava and exclude from test coverage

Signed-off-by: Bhavana Ramaram <[email protected]>

---------

Signed-off-by: Bhavana Ramaram <[email protected]>
(cherry picked from commit 8da7bd2)
opensearch-trigger-bot bot pushed a commit that referenced this pull request Sep 5, 2024
* support get batch transform job status in get task API

Signed-off-by: Bhavana Ramaram <[email protected]>

* add cancel batch prediction job API for offline inference

Signed-off-by: Bhavana Ramaram <[email protected]>

* add unit tests and address comments

Signed-off-by: Bhavana Ramaram <[email protected]>

* stash context for get model

Signed-off-by: Bhavana Ramaram <[email protected]>

* apply spotlessJava and exclude from test coverage

Signed-off-by: Bhavana Ramaram <[email protected]>

---------

Signed-off-by: Bhavana Ramaram <[email protected]>
(cherry picked from commit 8da7bd2)
Zhangxunmt pushed a commit that referenced this pull request Sep 5, 2024
* support get batch transform job status in get task API

Signed-off-by: Bhavana Ramaram <[email protected]>

* add cancel batch prediction job API for offline inference

Signed-off-by: Bhavana Ramaram <[email protected]>

* add unit tests and address comments

Signed-off-by: Bhavana Ramaram <[email protected]>

* stash context for get model

Signed-off-by: Bhavana Ramaram <[email protected]>

* apply spotlessJava and exclude from test coverage

Signed-off-by: Bhavana Ramaram <[email protected]>

---------

Signed-off-by: Bhavana Ramaram <[email protected]>
(cherry picked from commit 8da7bd2)

Co-authored-by: Bhavana Ramaram <[email protected]>
Zhangxunmt pushed a commit that referenced this pull request Sep 5, 2024
* support get batch transform job status in get task API

Signed-off-by: Bhavana Ramaram <[email protected]>

* add cancel batch prediction job API for offline inference

Signed-off-by: Bhavana Ramaram <[email protected]>

* add unit tests and address comments

Signed-off-by: Bhavana Ramaram <[email protected]>

* stash context for get model

Signed-off-by: Bhavana Ramaram <[email protected]>

* apply spotlessJava and exclude from test coverage

Signed-off-by: Bhavana Ramaram <[email protected]>

---------

Signed-off-by: Bhavana Ramaram <[email protected]>
(cherry picked from commit 8da7bd2)

Co-authored-by: Bhavana Ramaram <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants