-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support get batch transform job status in get task API #2825
Conversation
CANCEL_BATCH, | ||
BATCH_STATUS; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest rename these two action type to CANCEL_BATCH_PREDICT and BATCH_PREDICT_STATUS for sake of consistency and extensibility. It's possible that other batch action like batch_ingest will be added in the future so only using batch may look confused.
} | ||
|
||
private void processRemoteBatchPrediction(MLTask mlTask, String taskId, ActionListener<MLTaskGetResponse> actionListener) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this empty line seems redundant.
@@ -275,6 +314,9 @@ public static MLTask parse(XContentParser parser) throws IOException { | |||
case IS_ASYNC_TASK_FIELD: | |||
async = parser.booleanValue(); | |||
break; | |||
case TRANSFORM_JOB_FIELD: | |||
transformJob = parser.map(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
later in the transport action. It's required all the values in this transformJob map is a String. Why not use parser.mapString() directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No the values of transformJob are objects, not strings
Map<String, String> parameters = new HashMap<>(); | ||
for (Map.Entry<String, ?> entry : transformJob.entrySet()) { | ||
if (entry.getValue() instanceof String) { | ||
parameters.put(entry.getKey(), (String) entry.getValue()); | ||
} else { | ||
log.debug("Value for key " + entry.getKey() + " is not a String"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This component can be moved to the StringUtils class for better sharing and reusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure will do it
.getDataAsMap(); | ||
if (dataAsMap != null | ||
&& (dataAsMap.containsKey("TransformJobArn") || dataAsMap.containsKey("id"))) { | ||
transformJob.putAll(dataAsMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just convert all the Object to String for easier data handling in the Get Task action, without any loss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Let me try to make that change and verify
ActionListener<Connector> listener = ActionListener.wrap(connector -> { | ||
connector.decrypt(BATCH_STATUS.name(), (credential) -> encryptor.decrypt(credential)); | ||
RemoteConnectorExecutor connectorExecutor = MLEngineClassLoader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is connector access controller included? If it's just one line change, I think we should still add this access check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean to check the user's access to the connector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should then also add model access controller I guess
This would need the new action type "task status" in the connector actions body. Can you pls add one example in the test doc or this PR for this new action type? Also UTs are still missing for the GetTask Transport action. |
@@ -45,6 +51,8 @@ public class MLTask implements ToXContentObject, Writeable { | |||
public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; | |||
public static final String ERROR_FIELD = "error"; | |||
public static final String IS_ASYNC_TASK_FIELD = "is_async"; | |||
public static final String TRANSFORM_JOB_FIELD = "transform_job"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transform_job
seems only limited to Sagemaker, how about changing to remote_job
? cc @Zhangxunmt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"remote job" sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about remote_batch_job
or batch_job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not all remote jobs are for batching purpose
out.writeBoolean(true); | ||
try { | ||
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> { | ||
out.writeString(gson.toJson(transformJob)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use out.writeMap(..)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
output.writeMap(transformJob, StreamOutput::writeString, StreamOutput::writeGenericValue)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Map<String, Object> type? I am trying to change transformJob's type to Map<String, String> but getting some exceptions while testing
} | ||
} | ||
|
||
if (parameters.containsKey("TransformJobArn") && parameters.get("TransformJobArn") != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments?
I think TransformJobArn
and TransformJobArn
only for Sagemaker.
}); | ||
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) { | ||
connectorAccessControlHelper | ||
.getConnector(client, model.getConnectorId(), ActionListener.runBefore(listener, threadContext::restore)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also consider internal connector in model? There will be no connector id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the catch. Let me add that case too
updatedTask.put(TRANSFORM_JOB_FIELD, transformJob); | ||
|
||
if ((transformJob.containsKey("status") && transformJob.get("status").equals("completed")) | ||
|| (transformJob.containsKey("TransformJobStatus") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such parsing logic looks only limited to Sagemaker, right ? Does this PR only support Sagemaker ? Suggest add some comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its both sagemaker and OpenAI. Will add more comments
2e6186b
to
2e616f0
Compare
2e616f0
to
b6be1f5
Compare
if (streamOutputVersion.onOrAfter(MLTask.MINIMAL_SUPPORTED_VERSION_FOR_BATCH_TRANSFORM_JOB)) { | ||
if (remoteJob != null) { | ||
out.writeBoolean(true); | ||
try { | ||
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> { | ||
out.writeString(gson.toJson(remoteJob)); | ||
return null; | ||
}); | ||
} catch (PrivilegedActionException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} else { | ||
out.writeBoolean(false); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use these two lines to replace this code block.
out.writeBoolean(true);
out.writeMap(remoteJob, StreamOutput::writeString, StreamOutput::writeGenericValue);
if (input.readBoolean()) { | ||
String mapStr = input.readString(); | ||
this.remoteJob = gson.fromJson(mapStr, Map.class); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.remoteJob = input.readMap(s -> s.readString(), s -> s.readGenericValue());
Why not use this for simplicity?
Map<String, String> parameters = new HashMap<>(); | ||
for (Map.Entry<String, ?> entry : remoteJob.entrySet()) { | ||
if (entry.getValue() instanceof String) { | ||
parameters.put(entry.getKey(), (String) entry.getValue()); | ||
} else { | ||
log.debug("Value for key " + entry.getKey() + " is not a String"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the remoteJob has to be <String, String>? If that's the case, why not throw exception when this remoteJob is created inside MLTask in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it seems easier when remote job is Map<String, String> while creating itself. Will make that change to avoid all this confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When converting to Map<String, String>, the values that are not strings will be lost. User may not be able to see complete response from remote job. I guess it is safe to use Map<String, Object> so that all key-value pairs from remote service are retained
if (parameters.containsKey("TransformJobArn") && parameters.get("TransformJobArn") != null) { | ||
String jobArn = parameters.get("TransformJobArn"); | ||
String transformJobName = jobArn.substring(jobArn.lastIndexOf("/") + 1); | ||
parameters.put("TransformJobName", transformJobName); | ||
parameters.remove("TransformJobArn"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parameters.put("TransformJobName", Optional.ofNullable(parameters.remove("TransformJobArn"))
.map(jobArn -> jobArn.substring(jobArn.lastIndexOf("/") + 1))
.orElse(null));
This block is used twice in the PR. Consider replacing it with this single statement to avoid code duplication.
actionListener.onResponse(new MLCancelBatchJobResponse(RestStatus.OK)); | ||
} else { | ||
log.debug("The status code from remote service is: " + modelOutput.getStatusCode()); | ||
actionListener.onFailure(new ResourceNotFoundException("Couldn't cancel the transform job. Please try again")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use ResourceNotFoundException here? It seems like that it can be any Exception from remote service. So we can just actionListener.onFailure(new OpenSearchException(modelOutput.getStatusCode()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense
Map<String, Object> updatedTask = new HashMap<>(); | ||
updatedTask.put(REMOTE_JOB_FIELD, remoteJob); | ||
|
||
if ((remoteJob.containsKey("status") && remoteJob.get("status").equals("completed")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the key-value pair in remoteJobStatus from remote service is not "status" -> "completed" || "Completed", but some other names or status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it can be other values like in progress, cancelled, etc
|
|
Signed-off-by: Bhavana Ramaram <[email protected]>
Signed-off-by: Bhavana Ramaram <[email protected]>
Signed-off-by: Bhavana Ramaram <[email protected]>
Signed-off-by: Bhavana Ramaram <[email protected]>
Signed-off-by: Bhavana Ramaram <[email protected]>
a5e59a9
to
8460349
Compare
* support get batch transform job status in get task API Signed-off-by: Bhavana Ramaram <[email protected]> * add cancel batch prediction job API for offline inference Signed-off-by: Bhavana Ramaram <[email protected]> * add unit tests and address comments Signed-off-by: Bhavana Ramaram <[email protected]> * stash context for get model Signed-off-by: Bhavana Ramaram <[email protected]> * apply spotlessJava and exclude from test coverage Signed-off-by: Bhavana Ramaram <[email protected]> --------- Signed-off-by: Bhavana Ramaram <[email protected]> (cherry picked from commit 8da7bd2)
* support get batch transform job status in get task API Signed-off-by: Bhavana Ramaram <[email protected]> * add cancel batch prediction job API for offline inference Signed-off-by: Bhavana Ramaram <[email protected]> * add unit tests and address comments Signed-off-by: Bhavana Ramaram <[email protected]> * stash context for get model Signed-off-by: Bhavana Ramaram <[email protected]> * apply spotlessJava and exclude from test coverage Signed-off-by: Bhavana Ramaram <[email protected]> --------- Signed-off-by: Bhavana Ramaram <[email protected]> (cherry picked from commit 8da7bd2)
* support get batch transform job status in get task API Signed-off-by: Bhavana Ramaram <[email protected]> * add cancel batch prediction job API for offline inference Signed-off-by: Bhavana Ramaram <[email protected]> * add unit tests and address comments Signed-off-by: Bhavana Ramaram <[email protected]> * stash context for get model Signed-off-by: Bhavana Ramaram <[email protected]> * apply spotlessJava and exclude from test coverage Signed-off-by: Bhavana Ramaram <[email protected]> --------- Signed-off-by: Bhavana Ramaram <[email protected]> (cherry picked from commit 8da7bd2) Co-authored-by: Bhavana Ramaram <[email protected]>
* support get batch transform job status in get task API Signed-off-by: Bhavana Ramaram <[email protected]> * add cancel batch prediction job API for offline inference Signed-off-by: Bhavana Ramaram <[email protected]> * add unit tests and address comments Signed-off-by: Bhavana Ramaram <[email protected]> * stash context for get model Signed-off-by: Bhavana Ramaram <[email protected]> * apply spotlessJava and exclude from test coverage Signed-off-by: Bhavana Ramaram <[email protected]> --------- Signed-off-by: Bhavana Ramaram <[email protected]> (cherry picked from commit 8da7bd2) Co-authored-by: Bhavana Ramaram <[email protected]>
Description
support get batch transform job status in get task API
//TODO: Will be adding UTs and ITs in next revision
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.