Skip to content

Commit

Permalink
Adding boolean argument to ML Commons API (#143) (#155)
Browse files Browse the repository at this point in the history
* adding boolean argument

Signed-off-by: Alibi Zhenis <[email protected]>

* changing test

Signed-off-by: Alibi Zhenis <[email protected]>

* adding load_model argument to upload_model

Signed-off-by: Alibi Zhenis <[email protected]>

* small fix

Signed-off-by: Alibi Zhenis <[email protected]>

* adding default value of load_model argument

Signed-off-by: Alibi Zhenis <[email protected]>

---------

Signed-off-by: Alibi Zhenis <[email protected]>
(cherry picked from commit 5bba4a6)

Co-authored-by: Alibi Zhenis <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and AlibiZhenis authored Apr 27, 2023
1 parent eb54d9c commit a81b524
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 64 deletions.
1 change: 1 addition & 0 deletions opensearch_py_ml/ml_commons/ml_common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
MODEL_UPLOAD_CHUNK_SIZE = 10_000_000
MODEL_MAX_SIZE = 4_000_000_000
BUF_SIZE = 65536 # lets read stuff in 64kb chunks!
TIMEOUT = 120 # timeout for synchronous method calls in seconds
87 changes: 53 additions & 34 deletions opensearch_py_ml/ml_commons/ml_commons_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from opensearchpy import OpenSearch

from opensearch_py_ml.ml_commons.ml_common_utils import ML_BASE_URI
from opensearch_py_ml.ml_commons.ml_common_utils import ML_BASE_URI, TIMEOUT
from opensearch_py_ml.ml_commons.model_uploader import ModelUploader


Expand All @@ -30,6 +30,8 @@ def upload_model(
model_path: str,
model_config_path: str,
isVerbose: bool = False,
load_model: bool = True,
wait_until_loaded: bool = True,
) -> str:
"""
This method uploads model into opensearch cluster using ml-common plugin's api.
Expand All @@ -56,19 +58,30 @@ def upload_model(
:type model_config_path: string
:param isVerbose: if isVerbose is true method will print more messages. default False
:type isVerbose: boolean
:param load_model: Whether to load the model in memory using uploaded model chunks
:type load_model: bool
:param wait_until_loaded: If load_model is true, whether to wait until the model is loaded into memory
:type wait_until_loaded: bool
:return: returns the model_id so that we can use this for further operation.
:rtype: string
"""
return self._model_uploader._upload_model(
model_id = self._model_uploader._upload_model(
model_path, model_config_path, isVerbose
)

# loading the model chunks from model index
if load_model:
self.load_model(model_id, wait_until_loaded=wait_until_loaded)

return model_id

def upload_pretrained_model(
self,
model_name: str,
model_version: str,
model_format: str,
load_model: bool = True,
wait_until_loaded: bool = True,
):
"""
This method uploads a pretrained model into opensearch cluster using ml-common plugin's api.
Expand All @@ -84,6 +97,8 @@ def upload_pretrained_model(
:type model_format: string
:param load_model: Whether to load the model in memory using uploaded model chunks
:type load_model: bool
:param wait_until_loaded: If load_model is true, whether to wait until the model is loaded into memory
:type wait_until_loaded: bool
:return: returns the model_id so that we can use this for further operation
:rtype: string
"""
Expand All @@ -97,20 +112,7 @@ def upload_pretrained_model(

# loading the model chunks from model index
if load_model:
self.load_model(model_id)
for i in range(120): # timeout is 120 seconds
time.sleep(1)
ml_model_status = self.get_model_info(model_id)
model_state = ml_model_status.get("model_state")
if model_state != "LOADING":
break

if model_state == "LOADED":
print("Model loaded into memory successfully")
elif model_state == "PARTIALLY_LOADED":
print("Model was loaded into memory only partially")
else:
raise Exception("Model load failed")
self.load_model(model_id, wait_until_loaded=wait_until_loaded)

return model_id

Expand All @@ -128,7 +130,7 @@ def _send_model_info(self, model_meta_json: dict):
url=f"{ML_BASE_URI}/models/_upload",
body=model_meta_json,
)
end = time.time() + 120 # timeout seconds
end = time.time() + TIMEOUT # timeout seconds
task_flag = False
while not task_flag or time.time() < end:
time.sleep(1)
Expand All @@ -142,22 +144,42 @@ def _send_model_info(self, model_meta_json: dict):
print("Model was uploaded successfully. Model Id: ", status["model_id"])
return status["model_id"]

def load_model(self, model_id: str) -> object:
def load_model(self, model_id: str, wait_until_loaded: bool = True) -> object:
"""
This method loads model into opensearch cluster using ml-common plugin's load model api
:param model_id: unique id of the model
:type model_id: string
:param wait_until_loaded: Whether to wait until the model is loaded into memory
:type wait_until_loaded: bool
:return: returns a json object, with task_id and status key.
:rtype: object
"""

API_URL = f"{ML_BASE_URI}/models/{model_id}/_load"

return self._client.transport.perform_request(
method="POST",
url=API_URL,
)
task_id = self._client.transport.perform_request(method="POST", url=API_URL)[
"task_id"
]

if wait_until_loaded:
# Wait until loaded
for i in range(TIMEOUT):
ml_model_status = self.get_model_info(model_id)
model_state = ml_model_status.get("model_state")
if model_state in ["LOADED", "PARTIALLY_LOADED"]:
break
time.sleep(1)

# Check the model status
if model_state == "LOADED":
print("Model loaded into memory successfully")
elif model_state == "PARTIALLY_LOADED":
print("Model was loaded into memory only partially")
else:
raise Exception("Model load failed")

return self._get_task_info(task_id)

def get_task_info(self, task_id: str, wait_until_task_done: bool = False) -> object:
"""
Expand All @@ -173,7 +195,7 @@ def get_task_info(self, task_id: str, wait_until_task_done: bool = False) -> obj
:rtype: object
"""
if wait_until_task_done:
end = time.time() + 120 # timeout seconds
end = time.time() + TIMEOUT # timeout seconds
task_flag = False
while not task_flag or time.time() < end:
time.sleep(1)
Expand Down Expand Up @@ -248,18 +270,15 @@ def unload_model(self, model_id: str, node_ids: List[str] = []) -> object:

API_URL = f"{ML_BASE_URI}/models/{model_id}/_unload"

API_BODY = {"node_ids": node_ids}
API_BODY = {}
if len(node_ids) > 0:
return self._client.transport.perform_request(
method="POST",
url=API_URL,
body=API_BODY,
)
else:
return self._client.transport.perform_request(
method="POST",
url=API_URL,
)
API_BODY["node_ids"] = node_ids

return self._client.transport.perform_request(
method="POST",
url=API_URL,
body=API_BODY,
)

def delete_model(self, model_id: str) -> object:
"""
Expand Down
49 changes: 19 additions & 30 deletions tests/ml_commons/test_ml_commons_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import os
import shutil
import time
from os.path import exists

from opensearchpy import OpenSearch
Expand Down Expand Up @@ -45,8 +44,6 @@
PRETRAINED_MODEL_VERSION = "1.0.1"
PRETRAINED_MODEL_FORMAT = "TORCH_SCRIPT"

UNLOAD_TIMEOUT = 300 # in seconds


def clean_test_folder(TEST_FOLDER):
if os.path.exists(TEST_FOLDER):
Expand Down Expand Up @@ -84,7 +81,10 @@ def test_integration_pretrained_model_upload_unload_delete():
model_version=PRETRAINED_MODEL_VERSION,
model_format=PRETRAINED_MODEL_FORMAT,
load_model=True,
wait_until_loaded=True,
)
ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") != "LOAD_FAILED"
except: # noqa: E722
raised = True
assert raised == False, "Raised Exception during pretrained model upload and load"
Expand All @@ -102,11 +102,6 @@ def test_integration_pretrained_model_upload_unload_delete():
raised = False
try:
ml_client.unload_model(model_id)
for i in range(int(UNLOAD_TIMEOUT / 10)):
ml_model_status = ml_client.get_model_info(model_id)
if ml_model_status.get("model_state") == "UNLOADED":
break
time.sleep(10)
ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") == "UNLOADED"
except: # noqa: E722
Expand Down Expand Up @@ -142,7 +137,7 @@ def test_integration_model_train_upload_full_cycle():
task_id = ""
try:
model_id = ml_client.upload_model(
MODEL_PATH, MODEL_CONFIG_FILE_PATH, isVerbose=True
MODEL_PATH, MODEL_CONFIG_FILE_PATH, load_model=False, isVerbose=True
)
print("Model_id:", model_id)
except: # noqa: E722
Expand All @@ -152,10 +147,12 @@ def test_integration_model_train_upload_full_cycle():
if model_id:
raised = False
try:
ml_load_status = ml_client.load_model(model_id)
# assert ml_load_status.get("status") == "CREATED"
ml_load_status = ml_client.load_model(model_id, wait_until_loaded=False)
task_id = ml_load_status.get("task_id")
assert task_id != "" or task_id is not None

ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") != "LOAD_FAILED"
except: # noqa: E722
raised = True
assert raised == False, "Raised Exception in loading model"
Expand Down Expand Up @@ -186,20 +183,17 @@ def test_integration_model_train_upload_full_cycle():
# This is test is being flaky. Sometimes the test is passing and sometimes showing 500 error
# due to memory circuit breaker.
# Todo: We need to revisit this test.
if ml_task_status.get("state") == "LOADED":
try:
raised = False
sentences = ["First test sentence", "Second test sentence"]
embedding_result = ml_client.generate_embedding(
model_id, sentences
)
print(embedding_result)
assert len(embedding_result.get("inference_results")) == 2
except: # noqa: E722
raised = True
assert (
raised == False
), "Raised Exception in generating sentence embedding"
try:
raised = False
sentences = ["First test sentence", "Second test sentence"]
embedding_result = ml_client.generate_embedding(model_id, sentences)
print(embedding_result)
assert len(embedding_result.get("inference_results")) == 2
except: # noqa: E722
raised = True
assert (
raised == False
), "Raised Exception in generating sentence embedding"

try:
delete_task_obj = ml_client.delete_task(task_id)
Expand All @@ -210,11 +204,6 @@ def test_integration_model_train_upload_full_cycle():

try:
ml_client.unload_model(model_id)
for i in range(int(UNLOAD_TIMEOUT / 10)):
ml_model_status = ml_client.get_model_info(model_id)
if ml_model_status.get("model_state") == "UNLOADED":
break
time.sleep(10)
ml_model_status = ml_client.get_model_info(model_id)
assert ml_model_status.get("model_state") == "UNLOADED"
except: # noqa: E722
Expand Down

0 comments on commit a81b524

Please sign in to comment.